Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/queueing/queue.go')
-rw-r--r--workhorse/internal/queueing/queue.go201
1 files changed, 201 insertions, 0 deletions
diff --git a/workhorse/internal/queueing/queue.go b/workhorse/internal/queueing/queue.go
new file mode 100644
index 00000000000..db082cf19c6
--- /dev/null
+++ b/workhorse/internal/queueing/queue.go
@@ -0,0 +1,201 @@
+package queueing
+
+import (
+ "errors"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+type errTooManyRequests struct{ error }
+type errQueueingTimedout struct{ error }
+
+var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
+var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}
+
+type queueMetrics struct {
+ queueingLimit prometheus.Gauge
+ queueingQueueLimit prometheus.Gauge
+ queueingQueueTimeout prometheus.Gauge
+ queueingBusy prometheus.Gauge
+ queueingWaiting prometheus.Gauge
+ queueingWaitingTime prometheus.Histogram
+ queueingErrors *prometheus.CounterVec
+}
+
+// newQueueMetrics prepares Prometheus metrics for queueing mechanism
+// name specifies name of the queue, used to label metrics with ConstLabel `queue_name`
+// Don't call newQueueMetrics twice with the same name argument!
+// timeout specifies the timeout of storing a request in queue - queueMetrics
+// uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time
+// metric
+func newQueueMetrics(name string, timeout time.Duration) *queueMetrics {
+ waitingTimeBuckets := []float64{
+ timeout.Seconds() * 0.01,
+ timeout.Seconds() * 0.05,
+ timeout.Seconds() * 0.10,
+ timeout.Seconds() * 0.25,
+ timeout.Seconds() * 0.50,
+ timeout.Seconds() * 0.75,
+ timeout.Seconds() * 0.90,
+ timeout.Seconds() * 0.95,
+ timeout.Seconds() * 0.99,
+ timeout.Seconds(),
+ }
+
+ metrics := &queueMetrics{
+ queueingLimit: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_limit",
+ Help: "Current limit set for the queueing mechanism",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingQueueLimit: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_queue_limit",
+ Help: "Current queueLimit set for the queueing mechanism",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingQueueTimeout: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_queue_timeout",
+ Help: "Current queueTimeout set for the queueing mechanism",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingBusy: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_busy",
+ Help: "How many queued requests are now processed",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingWaiting: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_waiting",
+ Help: "How many requests are now queued",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingWaitingTime: promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: "gitlab_workhorse_queueing_waiting_time",
+ Help: "How many time a request spent in queue",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ Buckets: waitingTimeBuckets,
+ }),
+
+ queueingErrors: promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_queueing_errors",
+ Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ },
+ []string{"type"},
+ ),
+ }
+
+ return metrics
+}
+
+type Queue struct {
+ *queueMetrics
+
+ name string
+ busyCh chan struct{}
+ waitingCh chan time.Time
+ timeout time.Duration
+}
+
+// newQueue creates a new queue
+// name specifies name used to label queue metrics.
+// Don't call newQueue twice with the same name argument!
+// limit specifies number of requests run concurrently
+// queueLimit specifies maximum number of requests that can be queued
+// timeout specifies the time limit of storing the request in the queue
+// if the number of requests is above the limit
+func newQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue {
+ queue := &Queue{
+ name: name,
+ busyCh: make(chan struct{}, limit),
+ waitingCh: make(chan time.Time, limit+queueLimit),
+ timeout: timeout,
+ }
+
+ queue.queueMetrics = newQueueMetrics(name, timeout)
+ queue.queueingLimit.Set(float64(limit))
+ queue.queueingQueueLimit.Set(float64(queueLimit))
+ queue.queueingQueueTimeout.Set(timeout.Seconds())
+
+ return queue
+}
+
+// Acquire takes one slot from the Queue
+// and returns when a request should be processed
+// it allows up to (limit) of requests running at a time
+// it allows to queue up to (queue-limit) requests
+func (s *Queue) Acquire() (err error) {
+ // push item to a queue to claim your own slot (non-blocking)
+ select {
+ case s.waitingCh <- time.Now():
+ s.queueingWaiting.Inc()
+ break
+ default:
+ s.queueingErrors.WithLabelValues("too_many_requests").Inc()
+ return ErrTooManyRequests
+ }
+
+ defer func() {
+ if err != nil {
+ waitStarted := <-s.waitingCh
+ s.queueingWaiting.Dec()
+ s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
+ }
+ }()
+
+ // fast path: push item to current processed items (non-blocking)
+ select {
+ case s.busyCh <- struct{}{}:
+ s.queueingBusy.Inc()
+ return nil
+ default:
+ break
+ }
+
+ timer := time.NewTimer(s.timeout)
+ defer timer.Stop()
+
+ // push item to current processed items (blocking)
+ select {
+ case s.busyCh <- struct{}{}:
+ s.queueingBusy.Inc()
+ return nil
+
+ case <-timer.C:
+ s.queueingErrors.WithLabelValues("queueing_timedout").Inc()
+ return ErrQueueingTimedout
+ }
+}
+
+// Release marks the finish of processing of requests
+// It triggers next request to be processed if it's in queue
+func (s *Queue) Release() {
+ // dequeue from queue to allow next request to be processed
+ waitStarted := <-s.waitingCh
+ s.queueingWaiting.Dec()
+ s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
+
+ <-s.busyCh
+ s.queueingBusy.Dec()
+}