Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/queueing')
-rw-r--r--workhorse/internal/queueing/queue.go201
-rw-r--r--workhorse/internal/queueing/queue_test.go62
-rw-r--r--workhorse/internal/queueing/requests.go51
-rw-r--r--workhorse/internal/queueing/requests_test.go76
4 files changed, 390 insertions, 0 deletions
diff --git a/workhorse/internal/queueing/queue.go b/workhorse/internal/queueing/queue.go
new file mode 100644
index 00000000000..db082cf19c6
--- /dev/null
+++ b/workhorse/internal/queueing/queue.go
@@ -0,0 +1,201 @@
+package queueing
+
+import (
+ "errors"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+type errTooManyRequests struct{ error }
+type errQueueingTimedout struct{ error }
+
+var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
+var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}
+
+type queueMetrics struct {
+ queueingLimit prometheus.Gauge
+ queueingQueueLimit prometheus.Gauge
+ queueingQueueTimeout prometheus.Gauge
+ queueingBusy prometheus.Gauge
+ queueingWaiting prometheus.Gauge
+ queueingWaitingTime prometheus.Histogram
+ queueingErrors *prometheus.CounterVec
+}
+
+// newQueueMetrics prepares Prometheus metrics for queueing mechanism
+// name specifies name of the queue, used to label metrics with ConstLabel `queue_name`
+// Don't call newQueueMetrics twice with the same name argument!
+// timeout specifies the timeout of storing a request in queue - queueMetrics
+// uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time
+// metric
+func newQueueMetrics(name string, timeout time.Duration) *queueMetrics {
+ waitingTimeBuckets := []float64{
+ timeout.Seconds() * 0.01,
+ timeout.Seconds() * 0.05,
+ timeout.Seconds() * 0.10,
+ timeout.Seconds() * 0.25,
+ timeout.Seconds() * 0.50,
+ timeout.Seconds() * 0.75,
+ timeout.Seconds() * 0.90,
+ timeout.Seconds() * 0.95,
+ timeout.Seconds() * 0.99,
+ timeout.Seconds(),
+ }
+
+ metrics := &queueMetrics{
+ queueingLimit: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_limit",
+ Help: "Current limit set for the queueing mechanism",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingQueueLimit: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_queue_limit",
+ Help: "Current queueLimit set for the queueing mechanism",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingQueueTimeout: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_queue_timeout",
+ Help: "Current queueTimeout set for the queueing mechanism",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingBusy: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_busy",
+ Help: "How many queued requests are now processed",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingWaiting: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_queueing_waiting",
+ Help: "How many requests are now queued",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ }),
+
+ queueingWaitingTime: promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: "gitlab_workhorse_queueing_waiting_time",
+ Help: "How many time a request spent in queue",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ Buckets: waitingTimeBuckets,
+ }),
+
+ queueingErrors: promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_queueing_errors",
+ Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
+ ConstLabels: prometheus.Labels{
+ "queue_name": name,
+ },
+ },
+ []string{"type"},
+ ),
+ }
+
+ return metrics
+}
+
+type Queue struct {
+ *queueMetrics
+
+ name string
+ busyCh chan struct{}
+ waitingCh chan time.Time
+ timeout time.Duration
+}
+
+// newQueue creates a new queue
+// name specifies name used to label queue metrics.
+// Don't call newQueue twice with the same name argument!
+// limit specifies number of requests run concurrently
+// queueLimit specifies maximum number of requests that can be queued
+// timeout specifies the time limit of storing the request in the queue
+// if the number of requests is above the limit
+func newQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue {
+ queue := &Queue{
+ name: name,
+ busyCh: make(chan struct{}, limit),
+ waitingCh: make(chan time.Time, limit+queueLimit),
+ timeout: timeout,
+ }
+
+ queue.queueMetrics = newQueueMetrics(name, timeout)
+ queue.queueingLimit.Set(float64(limit))
+ queue.queueingQueueLimit.Set(float64(queueLimit))
+ queue.queueingQueueTimeout.Set(timeout.Seconds())
+
+ return queue
+}
+
+// Acquire takes one slot from the Queue
+// and returns when a request should be processed
+// it allows up to (limit) of requests running at a time
+// it allows to queue up to (queue-limit) requests
+func (s *Queue) Acquire() (err error) {
+ // push item to a queue to claim your own slot (non-blocking)
+ select {
+ case s.waitingCh <- time.Now():
+ s.queueingWaiting.Inc()
+ break
+ default:
+ s.queueingErrors.WithLabelValues("too_many_requests").Inc()
+ return ErrTooManyRequests
+ }
+
+ defer func() {
+ if err != nil {
+ waitStarted := <-s.waitingCh
+ s.queueingWaiting.Dec()
+ s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
+ }
+ }()
+
+ // fast path: push item to current processed items (non-blocking)
+ select {
+ case s.busyCh <- struct{}{}:
+ s.queueingBusy.Inc()
+ return nil
+ default:
+ break
+ }
+
+ timer := time.NewTimer(s.timeout)
+ defer timer.Stop()
+
+ // push item to current processed items (blocking)
+ select {
+ case s.busyCh <- struct{}{}:
+ s.queueingBusy.Inc()
+ return nil
+
+ case <-timer.C:
+ s.queueingErrors.WithLabelValues("queueing_timedout").Inc()
+ return ErrQueueingTimedout
+ }
+}
+
+// Release marks the finish of processing of requests
+// It triggers next request to be processed if it's in queue
+func (s *Queue) Release() {
+ // dequeue from queue to allow next request to be processed
+ waitStarted := <-s.waitingCh
+ s.queueingWaiting.Dec()
+ s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
+
+ <-s.busyCh
+ s.queueingBusy.Dec()
+}
diff --git a/workhorse/internal/queueing/queue_test.go b/workhorse/internal/queueing/queue_test.go
new file mode 100644
index 00000000000..7f5ed9154f4
--- /dev/null
+++ b/workhorse/internal/queueing/queue_test.go
@@ -0,0 +1,62 @@
+package queueing
+
+import (
+ "testing"
+ "time"
+)
+
+func TestNormalQueueing(t *testing.T) {
+ q := newQueue("queue 1", 2, 1, time.Microsecond)
+ err1 := q.Acquire()
+ if err1 != nil {
+ t.Fatal("we should acquire a new slot")
+ }
+
+ err2 := q.Acquire()
+ if err2 != nil {
+ t.Fatal("we should acquire a new slot")
+ }
+
+ err3 := q.Acquire()
+ if err3 != ErrQueueingTimedout {
+ t.Fatal("we should timeout")
+ }
+
+ q.Release()
+
+ err4 := q.Acquire()
+ if err4 != nil {
+ t.Fatal("we should acquire a new slot")
+ }
+}
+
+func TestQueueLimit(t *testing.T) {
+ q := newQueue("queue 2", 1, 0, time.Microsecond)
+ err1 := q.Acquire()
+ if err1 != nil {
+ t.Fatal("we should acquire a new slot")
+ }
+
+ err2 := q.Acquire()
+ if err2 != ErrTooManyRequests {
+ t.Fatal("we should fail because of not enough slots in queue")
+ }
+}
+
+func TestQueueProcessing(t *testing.T) {
+ q := newQueue("queue 3", 1, 1, time.Second)
+ err1 := q.Acquire()
+ if err1 != nil {
+ t.Fatal("we should acquire a new slot")
+ }
+
+ go func() {
+ time.Sleep(50 * time.Microsecond)
+ q.Release()
+ }()
+
+ err2 := q.Acquire()
+ if err2 != nil {
+ t.Fatal("we should acquire slot after the previous one finished")
+ }
+}
diff --git a/workhorse/internal/queueing/requests.go b/workhorse/internal/queueing/requests.go
new file mode 100644
index 00000000000..409a7656fa4
--- /dev/null
+++ b/workhorse/internal/queueing/requests.go
@@ -0,0 +1,51 @@
+package queueing
+
+import (
+ "net/http"
+ "time"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
+)
+
+const (
+ DefaultTimeout = 30 * time.Second
+ httpStatusTooManyRequests = 429
+)
+
+// QueueRequests creates a new request queue
+// name specifies the name of queue, used to label Prometheus metrics
+// Don't call QueueRequests twice with the same name argument!
+// h specifies a http.Handler which will handle the queue requests
+// limit specifies number of requests run concurrently
+// queueLimit specifies maximum number of requests that can be queued
+// queueTimeout specifies the time limit of storing the request in the queue
+func QueueRequests(name string, h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler {
+ if limit == 0 {
+ return h
+ }
+ if queueTimeout == 0 {
+ queueTimeout = DefaultTimeout
+ }
+
+ queue := newQueue(name, limit, queueLimit, queueTimeout)
+
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ err := queue.Acquire()
+
+ switch err {
+ case nil:
+ defer queue.Release()
+ h.ServeHTTP(w, r)
+
+ case ErrTooManyRequests:
+ http.Error(w, "Too Many Requests", httpStatusTooManyRequests)
+
+ case ErrQueueingTimedout:
+ http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
+
+ default:
+ helper.Fail500(w, r, err)
+ }
+
+ })
+}
diff --git a/workhorse/internal/queueing/requests_test.go b/workhorse/internal/queueing/requests_test.go
new file mode 100644
index 00000000000..f1c52e5c6f5
--- /dev/null
+++ b/workhorse/internal/queueing/requests_test.go
@@ -0,0 +1,76 @@
+package queueing
+
+import (
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+)
+
+var httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintln(w, "OK")
+})
+
+func pausedHttpHandler(pauseCh chan struct{}) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ <-pauseCh
+ fmt.Fprintln(w, "OK")
+ })
+}
+
+func TestNormalRequestProcessing(t *testing.T) {
+ w := httptest.NewRecorder()
+ h := QueueRequests("Normal request processing", httpHandler, 1, 1, time.Second)
+ h.ServeHTTP(w, nil)
+ if w.Code != 200 {
+ t.Fatal("QueueRequests should process request")
+ }
+}
+
+// testSlowRequestProcessing creates a new queue,
+// then it runs a number of requests that are going through queue,
+// we return the response of first finished request,
+// where status of request can be 200, 429 or 503
+func testSlowRequestProcessing(name string, count int, limit, queueLimit uint, queueTimeout time.Duration) *httptest.ResponseRecorder {
+ pauseCh := make(chan struct{})
+ defer close(pauseCh)
+
+ handler := QueueRequests("Slow request processing: "+name, pausedHttpHandler(pauseCh), limit, queueLimit, queueTimeout)
+
+ respCh := make(chan *httptest.ResponseRecorder, count)
+
+ // queue requests to use up the queue
+ for i := 0; i < count; i++ {
+ go func() {
+ w := httptest.NewRecorder()
+ handler.ServeHTTP(w, nil)
+ respCh <- w
+ }()
+ }
+
+ // dequeue first request
+ return <-respCh
+}
+
+// TestQueueingTimeout performs 2 requests
+// the queue limit and length is 1,
+// the second request gets timed-out
+func TestQueueingTimeout(t *testing.T) {
+ w := testSlowRequestProcessing("timeout", 2, 1, 1, time.Microsecond)
+
+ if w.Code != 503 {
+ t.Fatal("QueueRequests should timeout queued request")
+ }
+}
+
+// TestQueueingTooManyRequests performs 3 requests
+// the queue limit and length is 1,
+// so the third request has to be rejected with 429
+func TestQueueingTooManyRequests(t *testing.T) {
+ w := testSlowRequestProcessing("too many requests", 3, 1, 1, time.Minute)
+
+ if w.Code != 429 {
+ t.Fatal("QueueRequests should return immediately and return too many requests")
+ }
+}