diff options
author | Justin Tobler <jtobler@gitlab.com> | 2023-07-21 17:22:43 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2023-07-21 17:22:43 +0300 |
commit | 429e9dc6845904f1af2263d420c6c9c64b8de915 (patch) | |
tree | 416d452582a4c9c3c7f6f2735b2135d0fec04872 | |
parent | 24b83a6dedcad7e6e57ee6932fcf2636065a5be2 (diff) | |
parent | c059b0955c700d9307e955b6ae065d3355490c84 (diff) |
Merge branch 'qmnguyen0711/implement-aimd-calculator' into 'master'
Implement adaptive calculator for concurrency limiter
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6058
Merged-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Justin Tobler <jtobler@gitlab.com>
Reviewed-by: karthik nayak <knayak@gitlab.com>
Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r-- | internal/limiter/adaptive_calculator.go | 289 | ||||
-rw-r--r-- | internal/limiter/adaptive_calculator_test.go | 671 | ||||
-rw-r--r-- | internal/limiter/adaptive_limit.go | 48 |
3 files changed, 1008 insertions, 0 deletions
diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go new file mode 100644 index 000000000..98c01161b --- /dev/null +++ b/internal/limiter/adaptive_calculator.go @@ -0,0 +1,289 @@ +package limiter + +import ( + "context" + "fmt" + "math" + "sync" + "sync/atomic" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" +) + +const ( + // MaximumWatcherTimeout is the number of maximum allowed timeout when polling backoff events from watchers. + // When this threshold is reached, a timeout polling is treated as a backoff event. + MaximumWatcherTimeout = 5 +) + +// BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the +// management of the AdaptiveCalculator at calibration points. +type BackoffEvent struct { + WatcherName string + ShouldBackoff bool + Reason string +} + +// ResourceWatcher is an interface of the watchers that monitor the system resources. +type ResourceWatcher interface { + // Name returns the name of the resource watcher + Name() string + // Poll returns a backoff event when a watcher determine something goes wrong with the resource it is + // monitoring. If everything is fine, it returns `nil`. Watchers are expected to respect the cancellation of + // the input context. + Poll(context.Context) (*BackoffEvent, error) +} + +// AdaptiveCalculator is responsible for calculating the adaptive limits based on additive increase/multiplicative +// decrease (AIMD) algorithm. This method involves gradually increasing the limit during normal process functioning +// but quickly reducing it when an issue (backoff event) occurs. It receives a list of AdaptiveLimiter and a list of +// ResourceWatcher. Although the limits may have different settings (Initial, Min, Max, BackoffFactor), they all move +// as a whole. The caller accesses the current limits via AdaptiveLimiter.Current method. +// +// When the calculator starts, each limit value is set to its Initial limit. Periodically, the calculator polls the +// backoff events from the watchers. The current value of each limit is re-calibrated as follows: +// * limit = limit + 1 if there is no backoff event since the last calibration. The new limit cannot exceed max limit. +// * limit = limit * BackoffFactor otherwise. The new limit cannot be lower than min limit. +// +// A watcher returning an error is treated as a no backoff event. +type AdaptiveCalculator struct { + sync.Mutex + + logger *logrus.Entry + // started tells whether the calculator already starts. One calculator is allowed to be used once. + started bool + // calibration is the time duration until the next calibration event. + calibration time.Duration + // limits are the list of adaptive limits managed by this calculator. + limits []AdaptiveLimiter + // watchers stores a list of resource watchers that return the backoff events when queried. + watchers []ResourceWatcher + // watcherTimeouts is a map of counters for consecutive timeouts. The counter is reset when the associated + // watcher returns a non-error event or exceeds MaximumWatcherTimeout. + watcherTimeouts map[ResourceWatcher]*atomic.Int32 + // lastBackoffEvent stores the last backoff event collected from the watchers. + lastBackoffEvent *BackoffEvent + // tickerCreator is a custom function that returns a Ticker. It's mostly used in test the manual ticker + tickerCreator func(duration time.Duration) helper.Ticker + + // currentLimitVec is the gauge of current limit value of an adaptive concurrency limit + currentLimitVec *prometheus.GaugeVec + // watcherErrorsVec is the counter of the total number of watcher errors + watcherErrorsVec *prometheus.CounterVec + // backoffEventsVec is the counter of the total number of backoff events + backoffEventsVec *prometheus.CounterVec +} + +// NewAdaptiveCalculator constructs a AdaptiveCalculator object. It's the responsibility of the caller to validate +// the correctness of input AdaptiveLimiter and ResourceWatcher. +func NewAdaptiveCalculator(calibration time.Duration, logger *logrus.Entry, limits []AdaptiveLimiter, watchers []ResourceWatcher) *AdaptiveCalculator { + watcherTimeouts := map[ResourceWatcher]*atomic.Int32{} + for _, watcher := range watchers { + watcherTimeouts[watcher] = &atomic.Int32{} + } + + return &AdaptiveCalculator{ + logger: logger, + calibration: calibration, + limits: limits, + watchers: watchers, + watcherTimeouts: watcherTimeouts, + lastBackoffEvent: nil, + currentLimitVec: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gitaly_concurrency_limiting_current_limit", + Help: "The current limit value of an adaptive concurrency limit", + }, + []string{"limit"}, + ), + watcherErrorsVec: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_concurrency_limiting_watcher_errors_total", + Help: "Counter of the total number of watcher errors", + }, + []string{"watcher"}, + ), + backoffEventsVec: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_concurrency_limiting_backoff_events_total", + Help: "Counter of the total number of backoff events", + }, + []string{"watcher"}, + ), + } +} + +// Start resets the current limit values and start a goroutine to poll the backoff events. This method exits after the +// mentioned goroutine starts. +func (c *AdaptiveCalculator) Start(ctx context.Context) (func(), error) { + c.Lock() + defer c.Unlock() + + if c.started { + return nil, fmt.Errorf("adaptive calculator: already started") + } + c.started = true + + // Reset all limits to their initial limits + for _, limit := range c.limits { + c.updateLimit(limit, limit.Setting().Initial) + } + + done := make(chan struct{}) + completed := make(chan struct{}) + + go func(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(completed) + + tickerCreator := c.tickerCreator + if tickerCreator == nil { + tickerCreator = helper.NewTimerTicker + } + timer := tickerCreator(c.calibration) + for { + // Reset the timer to the next calibration point. It accounts for the resource polling latencies. + timer.Reset() + select { + case <-timer.C(): + // If multiple watchers fire multiple backoff events, the calculator decreases once. + // Usually, resources are highly correlated. When the memory level raises too high, + // the CPU usage also increases due to page faulting, memory reclaim, GC activities, etc. + // We might also have multiple watchers for the same resources, for example, memory + // usage watcher and page fault counter. Hence, re-calibrating after each event will + // cut the limits too aggressively. + c.pollBackoffEvent(ctx) + c.calibrateLimits(ctx) + + // Reset backoff event + c.setLastBackoffEvent(nil) + case <-done: + timer.Stop() + return + } + } + }(ctx) + + return func() { + close(done) + <-completed + }, nil +} + +// Describe is used to describe Prometheus metrics. +func (c *AdaptiveCalculator) Describe(descs chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(c, descs) +} + +// Collect is used to collect Prometheus metrics. +func (c *AdaptiveCalculator) Collect(metrics chan<- prometheus.Metric) { + c.currentLimitVec.Collect(metrics) + c.watcherErrorsVec.Collect(metrics) + c.backoffEventsVec.Collect(metrics) +} + +func (c *AdaptiveCalculator) pollBackoffEvent(ctx context.Context) { + // Set a timeout to prevent resource watcher runs forever. The deadline + // is the next calibration event. + ctx, cancel := context.WithTimeout(ctx, c.calibration) + defer cancel() + + for _, w := range c.watchers { + // If the context is cancelled, return early. + if ctx.Err() != nil { + return + } + + logger := c.logger.WithField("watcher", w.Name()) + event, err := w.Poll(ctx) + if err != nil { + if err == context.DeadlineExceeded { + c.watcherTimeouts[w].Add(1) + // If the watcher timeouts for a number of consecutive times, treat it as a + // backoff event. + if timeoutCount := c.watcherTimeouts[w].Load(); timeoutCount >= MaximumWatcherTimeout { + c.setLastBackoffEvent(&BackoffEvent{ + WatcherName: w.Name(), + ShouldBackoff: true, + Reason: fmt.Sprintf("%d consecutive polling timeout errors", timeoutCount), + }) + // Reset the timeout counter. The next MaximumWatcherTimeout will trigger + // another backoff event. + c.watcherTimeouts[w].Store(0) + } + } + + if err != context.Canceled { + c.watcherErrorsVec.WithLabelValues(w.Name()).Inc() + logger.Errorf("poll from resource watcher: %s", err) + } + + continue + } + // Reset the timeout counter if the watcher polls successfully. + c.watcherTimeouts[w].Store(0) + if event.ShouldBackoff { + c.setLastBackoffEvent(event) + } + } +} + +func (c *AdaptiveCalculator) calibrateLimits(ctx context.Context) { + c.Lock() + defer c.Unlock() + + if ctx.Err() != nil { + return + } + + for _, limit := range c.limits { + setting := limit.Setting() + + var newLimit int + logger := c.logger.WithField("limit", limit.Name()) + + if c.lastBackoffEvent == nil { + // Additive increase, one unit at a time + newLimit = limit.Current() + 1 + if newLimit > setting.Max { + newLimit = setting.Max + } + logger.WithFields(map[string]interface{}{ + "previous_limit": limit.Current(), + "new_limit": newLimit, + }).Debugf("Additive increase") + } else { + // Multiplicative decrease + newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffBackoff)) + if newLimit < setting.Min { + newLimit = setting.Min + } + logger.WithFields(map[string]interface{}{ + "previous_limit": limit.Current(), + "new_limit": newLimit, + "watcher": c.lastBackoffEvent.WatcherName, + "reason": c.lastBackoffEvent.Reason, + }).Infof("Multiplicative decrease") + } + c.updateLimit(limit, newLimit) + } +} + +func (c *AdaptiveCalculator) setLastBackoffEvent(event *BackoffEvent) { + c.Lock() + defer c.Unlock() + + c.lastBackoffEvent = event + if event != nil && event.ShouldBackoff { + c.backoffEventsVec.WithLabelValues(event.WatcherName).Inc() + } +} + +func (c *AdaptiveCalculator) updateLimit(limit AdaptiveLimiter, newLimit int) { + limit.Update(newLimit) + c.currentLimitVec.WithLabelValues(limit.Name()).Set(float64(newLimit)) +} diff --git a/internal/limiter/adaptive_calculator_test.go b/internal/limiter/adaptive_calculator_test.go new file mode 100644 index 000000000..a89445211 --- /dev/null +++ b/internal/limiter/adaptive_calculator_test.go @@ -0,0 +1,671 @@ +package limiter + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestAdaptiveCalculator_alreadyStarted(t *testing.T) { + t.Parallel() + + calculator := NewAdaptiveCalculator(10*time.Millisecond, testhelper.NewDiscardingLogEntry(t), nil, nil) + + stop, err := calculator.Start(testhelper.Context(t)) + require.NoError(t, err) + + stop2, err := calculator.Start(testhelper.Context(t)) + require.Errorf(t, err, "adaptive calculator: already started") + require.Nil(t, stop2) + + stop() +} + +func TestAdaptiveCalculator_realTimerTicker(t *testing.T) { + t.Parallel() + + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.InfoLevel) + + limit := newTestLimit("testLimit", 25, 100, 10, 0.5) + watcher := newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil) + + calibration := 10 * time.Millisecond + calculator := NewAdaptiveCalculator(calibration, logger.WithContext(testhelper.Context(t)), []AdaptiveLimiter{limit}, []ResourceWatcher{watcher}) + + stop, err := calculator.Start(testhelper.Context(t)) + require.NoError(t, err) + time.Sleep(10 * calibration) + stop() + + require.Equal(t, []int{25, 26, 27, 28, 29, 30}, limit.currents[:6]) + assertLogs(t, []string{}, hook.AllEntries()) +} + +func TestAdaptiveCalculator(t *testing.T) { + t.Parallel() + + tests := []struct { + desc string + limits []AdaptiveLimiter + watchers []ResourceWatcher + waitEvents int + // The first captured limit is the initial limit + expectedLimits map[string][]int + expectedLogs []string + expectedMetrics string + }{ + { + desc: "Empty watchers", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit", 25, 100, 10, 0.5), + }, + watchers: []ResourceWatcher{}, + expectedLimits: map[string][]int{ + "testLimit": {25, 26, 27, 28, 29, 30}, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit"} {testLimit} + +`, + }, + { + desc: "Empty limits and watchers", + waitEvents: 5, + limits: []AdaptiveLimiter{}, + watchers: []ResourceWatcher{}, + expectedLimits: map[string][]int{}, + }, + { + desc: "Additive increase", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit", 25, 100, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil), + }, + expectedLimits: map[string][]int{ + "testLimit": {25, 26, 27, 28, 29, 30}, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit"} {testLimit} + +`, + }, + { + desc: "Additive increase until reaching the max limit", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit", 25, 27, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil), + }, + expectedLimits: map[string][]int{ + "testLimit": {25, 26, 27, 27, 27, 27}, + }, + // In this test, the current limit never exceeds the max value. No need to replace the value. + expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit"} 27 + +`, + }, + { + desc: "Additive increase until a backoff event", + waitEvents: 7, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit", 25, 100, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher", []string{"", "", "", "", "cgroup exceeds limit", "", ""}, nil), + }, + expectedLimits: map[string][]int{ + "testLimit": {25, 26, 27, 28, 29, 14, 15, 16}, + }, + expectedLogs: []string{ + `level=info msg="Multiplicative decrease" limit=testLimit new_limit=14 previous_limit=29 reason="cgroup exceeds limit" watcher=testWatcher`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 1 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit"} {testLimit} + +`, + }, + { + desc: "Multiplicative decrease until reaching min limit", + waitEvents: 6, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit", 25, 100, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher", []string{"", "", "reason 1", "reason 2", "reason 3", ""}, nil), + }, + expectedLimits: map[string][]int{ + "testLimit": {25, 26, 27, 13, 10, 10, 11}, + }, + expectedLogs: []string{ + `level=info msg="Multiplicative decrease" limit=testLimit new_limit=13 previous_limit=27 reason="reason 1" watcher=testWatcher`, + `level=info msg="Multiplicative decrease" limit=testLimit new_limit=10 previous_limit=13 reason="reason 2" watcher=testWatcher`, + `level=info msg="Multiplicative decrease" limit=testLimit new_limit=10 previous_limit=10 reason="reason 3" watcher=testWatcher`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 3 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit"} {testLimit} + +`, + }, + { + desc: "Additive increase multiple limits", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher", []string{"", "", "", "", ""}, nil), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 28, 29, 30}, + "testLimit2": {15, 16, 17, 18, 19, 20}, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} + +`, + }, + { + desc: "Additive increase multiple limits until a backoff event", + waitEvents: 7, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher", []string{"", "", "", "", "", "cgroup exceeds limit", ""}, nil), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 28, 29, 30, 15, 16}, + "testLimit2": {15, 16, 17, 18, 19, 20, 10, 11}, + }, + expectedLogs: []string{ + `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=15 previous_limit=30 reason="cgroup exceeds limit" watcher=testWatcher`, + `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=20 reason="cgroup exceeds limit" watcher=testWatcher`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 1 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} + +`, + }, + { + desc: "Additive increase multiple limits until a backoff event with multiple watchers", + waitEvents: 10, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher1", []string{"", "", "", "", "", "", "", "", "cgroup exceeds limit 1", ""}, nil), + newTestWatcher("testWatcher2", []string{"", "", "", "", "", "", "", "", "", ""}, nil), + newTestWatcher("testWatcher3", []string{"", "", "cgroup exceeds limit 2", "", "", "", "", "", ""}, nil), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 13, 14, 15, 16, 17, 18, 10, 11}, + "testLimit2": {15, 16, 17, 10, 11, 12, 13, 14, 15, 10, 11}, + }, + expectedLogs: []string{ + `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=13 previous_limit=27 reason="cgroup exceeds limit 2" watcher=testWatcher3`, + `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=17 reason="cgroup exceeds limit 2" watcher=testWatcher3`, + `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=10 previous_limit=18 reason="cgroup exceeds limit 1" watcher=testWatcher1`, + `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=15 reason="cgroup exceeds limit 1" watcher=testWatcher1`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher1"} 1 +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher3"} 1 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} + +`, + }, + { + desc: "Additive increase multiple limits until multiple watchers return multiple errors", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher1", []string{"", "", "cgroup exceeds limit 1", "", ""}, nil), + newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, nil), + newTestWatcher("testWatcher3", []string{"", "", "cgroup exceeds limit 2", "", ""}, nil), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 13, 14, 15}, + "testLimit2": {15, 16, 17, 10, 11, 12}, + }, + expectedLogs: []string{ + `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=13 previous_limit=27 reason="cgroup exceeds limit 2" watcher=testWatcher3`, + `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=17 reason="cgroup exceeds limit 2" watcher=testWatcher3`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher1"} 1 +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher3"} 1 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} + +`, + }, + { + desc: "a watcher returns an error", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher1", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, nil}), + newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, []error{nil, nil, nil, fmt.Errorf("unexpected"), nil}), + newTestWatcher("testWatcher3", []string{"", "", "", "", ""}, []error{nil, fmt.Errorf("unexpected"), nil, nil, nil}), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 28, 29, 30}, + "testLimit2": {15, 16, 17, 18, 19, 20}, + }, + expectedLogs: []string{ + `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher3`, + `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher2`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} +# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors +# TYPE gitaly_concurrency_limiting_watcher_errors_total counter +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher2"} 1 +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher3"} 1 + +`, + }, + { + desc: "a watcher returns an error at the same time another watcher returns backoff event", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher1", []string{"", "", "", "backoff please", ""}, []error{nil, nil, nil, nil, nil}), + newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, []error{nil, nil, nil, fmt.Errorf("unexpected"), nil}), + newTestWatcher("testWatcher3", []string{"", "", "", "", ""}, []error{nil, fmt.Errorf("unexpected"), nil, nil, nil}), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 28, 14, 15}, + "testLimit2": {15, 16, 17, 18, 10, 11}, + }, + expectedLogs: []string{ + `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher3`, + `level=error msg="poll from resource watcher: unexpected" watcher=testWatcher2`, + `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=28 reason="backoff please" watcher=testWatcher1`, + `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=18 reason="backoff please" watcher=testWatcher1`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher1"} 1 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} +# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors +# TYPE gitaly_concurrency_limiting_watcher_errors_total counter +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher2"} 1 +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher3"} 1 + +`, + }, + { + desc: "a watcher returns context canceled error", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher1", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, context.Canceled}), + newTestWatcher("testWatcher2", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, context.Canceled}), + newTestWatcher("testWatcher3", []string{"", "", "", "", ""}, []error{nil, nil, nil, nil, context.Canceled}), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 28, 29, 30}, + "testLimit2": {15, 16, 17, 18, 19, 20}, + }, + expectedLogs: []string{}, + expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} + +`, + }, + { + desc: "a watcher returns some timeout errors", + waitEvents: 5, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher("testWatcher", []string{"", "", "", "", ""}, []error{nil, context.DeadlineExceeded, context.DeadlineExceeded, nil, nil}), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 28, 29, 30}, + "testLimit2": {15, 16, 17, 18, 19, 20}, + }, + expectedLogs: []string{ + // Not enough timeout to trigger a backoff event + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} +# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors +# TYPE gitaly_concurrency_limiting_watcher_errors_total counter +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher"} 2 + +`, + }, + { + desc: "a watcher returns 5 consecutive timeout errors", + waitEvents: 6, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher( + "testWatcher", + []string{"", "", "", "", "", ""}, + []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil}, + ), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 28, 29, 14, 15}, + "testLimit2": {15, 16, 17, 18, 19, 10, 11}, + }, + expectedLogs: []string{ + // The last timeout triggers a backoff event, then increases again + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=29 reason="5 consecutive polling timeout errors" watcher=testWatcher`, + `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=19 reason="5 consecutive polling timeout errors" watcher=testWatcher`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 1 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} +# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors +# TYPE gitaly_concurrency_limiting_watcher_errors_total counter +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher"} 5 + +`, + }, + { + desc: "a watcher returns 6 consecutive timeout errors", + waitEvents: 7, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher( + "testWatcher", + []string{"", "", "", "", "", "", ""}, + []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil}, + ), + }, + expectedLimits: map[string][]int{ + // The one next to the last triggers an event, but the last timeout does not trigger one. + "testLimit1": {25, 26, 27, 28, 29, 14, 15, 16}, + "testLimit2": {15, 16, 17, 18, 19, 10, 11, 12}, + }, + expectedLogs: []string{ + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=29 reason="5 consecutive polling timeout errors" watcher=testWatcher`, + `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=19 reason="5 consecutive polling timeout errors" watcher=testWatcher`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher"} 1 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} +# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors +# TYPE gitaly_concurrency_limiting_watcher_errors_total counter +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher"} 6 + +`, + }, + { + desc: "multiple watchers returns 5 consecutive timeout errors", + waitEvents: 6, + limits: []AdaptiveLimiter{ + newTestLimit("testLimit1", 25, 100, 10, 0.5), + newTestLimit("testLimit2", 15, 30, 10, 0.5), + }, + watchers: []ResourceWatcher{ + newTestWatcher( + "testWatcher1", + []string{"", "", "", "", "", ""}, + []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil}, + ), + newTestWatcher( + "testWatcher2", + []string{"", "", "", "", "", ""}, + []error{context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, context.DeadlineExceeded, nil}, + ), + }, + expectedLimits: map[string][]int{ + "testLimit1": {25, 26, 27, 28, 29, 14, 15}, + "testLimit2": {15, 16, 17, 18, 19, 10, 11}, + }, + expectedLogs: []string{ + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher1`, + `level=error msg="poll from resource watcher: context deadline exceeded" watcher=testWatcher2`, + `level=info msg="Multiplicative decrease" limit=testLimit1 new_limit=14 previous_limit=29 reason="5 consecutive polling timeout errors" watcher=testWatcher2`, + `level=info msg="Multiplicative decrease" limit=testLimit2 new_limit=10 previous_limit=19 reason="5 consecutive polling timeout errors" watcher=testWatcher2`, + }, + expectedMetrics: `# HELP gitaly_concurrency_limiting_backoff_events_total Counter of the total number of backoff events +# TYPE gitaly_concurrency_limiting_backoff_events_total counter +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher1"} 1 +gitaly_concurrency_limiting_backoff_events_total{watcher="testWatcher2"} 1 +# HELP gitaly_concurrency_limiting_current_limit The current limit value of an adaptive concurrency limit +# TYPE gitaly_concurrency_limiting_current_limit gauge +gitaly_concurrency_limiting_current_limit{limit="testLimit1"} {testLimit1} +gitaly_concurrency_limiting_current_limit{limit="testLimit2"} {testLimit2} +# HELP gitaly_concurrency_limiting_watcher_errors_total Counter of the total number of watcher errors +# TYPE gitaly_concurrency_limiting_watcher_errors_total counter +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher1"} 5 +gitaly_concurrency_limiting_watcher_errors_total{watcher="testWatcher2"} 5 + +`, + }, + } + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + logger, hook := test.NewNullLogger() + hook.Reset() + logger.SetLevel(logrus.InfoLevel) + + ticker := helper.NewManualTicker() + + calibration := 10 * time.Millisecond + calculator := NewAdaptiveCalculator(calibration, logger.WithContext(testhelper.Context(t)), tc.limits, tc.watchers) + calculator.tickerCreator = func(duration time.Duration) helper.Ticker { return ticker } + + stop, err := calculator.Start(testhelper.Context(t)) + require.NoError(t, err) + for i := 0; i <= tc.waitEvents; i++ { + ticker.Tick() + } + stop() + + for name, expectedLimits := range tc.expectedLimits { + limit := findLimitWithName(tc.limits, name) + require.NotNil(t, limit, "not found limit with name %q", name) + require.Equal(t, expectedLimits, limit.currents[:tc.waitEvents+1]) + } + + // Replace the current limit in the metrics. The above test setup adds some time buffer. There + // might be some extra calibrations after the test finishes. + metrics := tc.expectedMetrics + for _, l := range tc.limits { + metrics = strings.Replace(metrics, fmt.Sprintf("{%s}", l.Name()), fmt.Sprintf("%d", l.Current()), -1) + } + assertLogs(t, tc.expectedLogs, hook.AllEntries()) + require.NoError(t, testutil.CollectAndCompare(calculator, strings.NewReader(metrics), + "gitaly_concurrency_limiting_current_limit", + "gitaly_concurrency_limiting_backoff_events_total", + "gitaly_concurrency_limiting_watcher_errors_total", + )) + }) + } +} + +func assertLogs(t *testing.T, expectedLogs []string, entries []*logrus.Entry) { + require.Equal(t, len(expectedLogs), len(entries)) + for index, expectedLog := range expectedLogs { + msg, err := entries[index].String() + require.NoError(t, err) + require.Contains(t, msg, expectedLog) + } +} + +func findLimitWithName(limits []AdaptiveLimiter, name string) *testLimit { + for _, l := range limits { + limit := l.(*testLimit) + if limit.name == name { + return limit + } + } + return nil +} + +type testLimit struct { + currents []int + name string + initial int + max int + min int + backoffBackoff float64 +} + +func newTestLimit(name string, initial int, max int, min int, backoff float64) *testLimit { + return &testLimit{name: name, initial: initial, max: max, min: min, backoffBackoff: backoff} +} + +func (l *testLimit) Name() string { return l.name } +func (l *testLimit) Current() int { + if len(l.currents) == 0 { + return 0 + } + return l.currents[len(l.currents)-1] +} +func (l *testLimit) Update(val int) { l.currents = append(l.currents, val) } +func (l *testLimit) Setting() AdaptiveSetting { + return AdaptiveSetting{ + Initial: l.initial, + Max: l.max, + Min: l.min, + BackoffBackoff: l.backoffBackoff, + } +} + +type testWatcher struct { + name string + events []*BackoffEvent + errors []error + index int +} + +func (w *testWatcher) Name() string { + return w.name +} + +func (w *testWatcher) Poll(context.Context) (*BackoffEvent, error) { + index := w.index + if index >= len(w.events) { + index = len(w.events) - 1 + } + var err error + if w.errors != nil { + err = w.errors[index] + } + w.index++ + return w.events[index], err +} + +func newTestWatcher(name string, reasons []string, errors []error) *testWatcher { + var events []*BackoffEvent + for _, reason := range reasons { + event := &BackoffEvent{ + ShouldBackoff: reason != "", + Reason: reason, + WatcherName: name, + } + events = append(events, event) + } + return &testWatcher{name: name, events: events, errors: errors} +} diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go new file mode 100644 index 000000000..2aefe5eb0 --- /dev/null +++ b/internal/limiter/adaptive_limit.go @@ -0,0 +1,48 @@ +package limiter + +import "sync/atomic" + +// AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter. +type AdaptiveSetting struct { + Initial int + Max int + Min int + BackoffBackoff float64 +} + +// AdaptiveLimiter is an interface for managing and updating adaptive limits. +// It exposes methods to get the name, current limit value, update the limit value, and access its settings. +type AdaptiveLimiter interface { + Name() string + Current() int + Update(val int) + Setting() AdaptiveSetting +} + +// AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current +// limit value, ensuring thread-safe updates. +type AdaptiveLimit struct { + name string + current atomic.Int32 + setting AdaptiveSetting +} + +// Name returns the name of the adaptive limit +func (l *AdaptiveLimit) Name() string { + return l.name +} + +// Current returns the current limit. This function can be called without the need for synchronization. +func (l *AdaptiveLimit) Current() int { + return int(l.current.Load()) +} + +// Update adjusts current limit value. +func (l *AdaptiveLimit) Update(val int) { + l.current.Store(int32(val)) +} + +// Setting returns the configuration parameters for an adaptive limiter. +func (l *AdaptiveLimit) Setting() AdaptiveSetting { + return l.setting +} |