diff options
Diffstat (limited to 'workhorse/internal/queueing/queue.go')
-rw-r--r-- | workhorse/internal/queueing/queue.go | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/workhorse/internal/queueing/queue.go b/workhorse/internal/queueing/queue.go new file mode 100644 index 00000000000..db082cf19c6 --- /dev/null +++ b/workhorse/internal/queueing/queue.go @@ -0,0 +1,201 @@ +package queueing + +import ( + "errors" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type errTooManyRequests struct{ error } +type errQueueingTimedout struct{ error } + +var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")} +var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")} + +type queueMetrics struct { + queueingLimit prometheus.Gauge + queueingQueueLimit prometheus.Gauge + queueingQueueTimeout prometheus.Gauge + queueingBusy prometheus.Gauge + queueingWaiting prometheus.Gauge + queueingWaitingTime prometheus.Histogram + queueingErrors *prometheus.CounterVec +} + +// newQueueMetrics prepares Prometheus metrics for queueing mechanism +// name specifies name of the queue, used to label metrics with ConstLabel `queue_name` +// Don't call newQueueMetrics twice with the same name argument! +// timeout specifies the timeout of storing a request in queue - queueMetrics +// uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time +// metric +func newQueueMetrics(name string, timeout time.Duration) *queueMetrics { + waitingTimeBuckets := []float64{ + timeout.Seconds() * 0.01, + timeout.Seconds() * 0.05, + timeout.Seconds() * 0.10, + timeout.Seconds() * 0.25, + timeout.Seconds() * 0.50, + timeout.Seconds() * 0.75, + timeout.Seconds() * 0.90, + timeout.Seconds() * 0.95, + timeout.Seconds() * 0.99, + timeout.Seconds(), + } + + metrics := &queueMetrics{ + queueingLimit: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_limit", + Help: "Current limit set for the queueing mechanism", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingQueueLimit: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_queue_limit", + Help: "Current queueLimit set for the queueing mechanism", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingQueueTimeout: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_queue_timeout", + Help: "Current queueTimeout set for the queueing mechanism", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingBusy: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_busy", + Help: "How many queued requests are now processed", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingWaiting: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_waiting", + Help: "How many requests are now queued", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingWaitingTime: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "gitlab_workhorse_queueing_waiting_time", + Help: "How many time a request spent in queue", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + Buckets: waitingTimeBuckets, + }), + + queueingErrors: promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_queueing_errors", + Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }, + []string{"type"}, + ), + } + + return metrics +} + +type Queue struct { + *queueMetrics + + name string + busyCh chan struct{} + waitingCh chan time.Time + timeout time.Duration +} + +// newQueue creates a new queue +// name specifies name used to label queue metrics. +// Don't call newQueue twice with the same name argument! +// limit specifies number of requests run concurrently +// queueLimit specifies maximum number of requests that can be queued +// timeout specifies the time limit of storing the request in the queue +// if the number of requests is above the limit +func newQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue { + queue := &Queue{ + name: name, + busyCh: make(chan struct{}, limit), + waitingCh: make(chan time.Time, limit+queueLimit), + timeout: timeout, + } + + queue.queueMetrics = newQueueMetrics(name, timeout) + queue.queueingLimit.Set(float64(limit)) + queue.queueingQueueLimit.Set(float64(queueLimit)) + queue.queueingQueueTimeout.Set(timeout.Seconds()) + + return queue +} + +// Acquire takes one slot from the Queue +// and returns when a request should be processed +// it allows up to (limit) of requests running at a time +// it allows to queue up to (queue-limit) requests +func (s *Queue) Acquire() (err error) { + // push item to a queue to claim your own slot (non-blocking) + select { + case s.waitingCh <- time.Now(): + s.queueingWaiting.Inc() + break + default: + s.queueingErrors.WithLabelValues("too_many_requests").Inc() + return ErrTooManyRequests + } + + defer func() { + if err != nil { + waitStarted := <-s.waitingCh + s.queueingWaiting.Dec() + s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds())) + } + }() + + // fast path: push item to current processed items (non-blocking) + select { + case s.busyCh <- struct{}{}: + s.queueingBusy.Inc() + return nil + default: + break + } + + timer := time.NewTimer(s.timeout) + defer timer.Stop() + + // push item to current processed items (blocking) + select { + case s.busyCh <- struct{}{}: + s.queueingBusy.Inc() + return nil + + case <-timer.C: + s.queueingErrors.WithLabelValues("queueing_timedout").Inc() + return ErrQueueingTimedout + } +} + +// Release marks the finish of processing of requests +// It triggers next request to be processed if it's in queue +func (s *Queue) Release() { + // dequeue from queue to allow next request to be processed + waitStarted := <-s.waitingCh + s.queueingWaiting.Dec() + s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds())) + + <-s.busyCh + s.queueingBusy.Dec() +} |