diff options
Diffstat (limited to 'workhorse/internal/queueing')
-rw-r--r-- | workhorse/internal/queueing/queue.go | 201 | ||||
-rw-r--r-- | workhorse/internal/queueing/queue_test.go | 62 | ||||
-rw-r--r-- | workhorse/internal/queueing/requests.go | 51 | ||||
-rw-r--r-- | workhorse/internal/queueing/requests_test.go | 76 |
4 files changed, 390 insertions, 0 deletions
diff --git a/workhorse/internal/queueing/queue.go b/workhorse/internal/queueing/queue.go new file mode 100644 index 00000000000..db082cf19c6 --- /dev/null +++ b/workhorse/internal/queueing/queue.go @@ -0,0 +1,201 @@ +package queueing + +import ( + "errors" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type errTooManyRequests struct{ error } +type errQueueingTimedout struct{ error } + +var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")} +var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")} + +type queueMetrics struct { + queueingLimit prometheus.Gauge + queueingQueueLimit prometheus.Gauge + queueingQueueTimeout prometheus.Gauge + queueingBusy prometheus.Gauge + queueingWaiting prometheus.Gauge + queueingWaitingTime prometheus.Histogram + queueingErrors *prometheus.CounterVec +} + +// newQueueMetrics prepares Prometheus metrics for queueing mechanism +// name specifies name of the queue, used to label metrics with ConstLabel `queue_name` +// Don't call newQueueMetrics twice with the same name argument! +// timeout specifies the timeout of storing a request in queue - queueMetrics +// uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time +// metric +func newQueueMetrics(name string, timeout time.Duration) *queueMetrics { + waitingTimeBuckets := []float64{ + timeout.Seconds() * 0.01, + timeout.Seconds() * 0.05, + timeout.Seconds() * 0.10, + timeout.Seconds() * 0.25, + timeout.Seconds() * 0.50, + timeout.Seconds() * 0.75, + timeout.Seconds() * 0.90, + timeout.Seconds() * 0.95, + timeout.Seconds() * 0.99, + timeout.Seconds(), + } + + metrics := &queueMetrics{ + queueingLimit: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_limit", + Help: "Current limit set for the queueing mechanism", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingQueueLimit: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_queue_limit", + Help: "Current queueLimit set for the queueing mechanism", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingQueueTimeout: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_queue_timeout", + Help: "Current queueTimeout set for the queueing mechanism", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingBusy: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_busy", + Help: "How many queued requests are now processed", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingWaiting: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "gitlab_workhorse_queueing_waiting", + Help: "How many requests are now queued", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }), + + queueingWaitingTime: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "gitlab_workhorse_queueing_waiting_time", + Help: "How many time a request spent in queue", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + Buckets: waitingTimeBuckets, + }), + + queueingErrors: promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_queueing_errors", + Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type", + ConstLabels: prometheus.Labels{ + "queue_name": name, + }, + }, + []string{"type"}, + ), + } + + return metrics +} + +type Queue struct { + *queueMetrics + + name string + busyCh chan struct{} + waitingCh chan time.Time + timeout time.Duration +} + +// newQueue creates a new queue +// name specifies name used to label queue metrics. +// Don't call newQueue twice with the same name argument! +// limit specifies number of requests run concurrently +// queueLimit specifies maximum number of requests that can be queued +// timeout specifies the time limit of storing the request in the queue +// if the number of requests is above the limit +func newQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue { + queue := &Queue{ + name: name, + busyCh: make(chan struct{}, limit), + waitingCh: make(chan time.Time, limit+queueLimit), + timeout: timeout, + } + + queue.queueMetrics = newQueueMetrics(name, timeout) + queue.queueingLimit.Set(float64(limit)) + queue.queueingQueueLimit.Set(float64(queueLimit)) + queue.queueingQueueTimeout.Set(timeout.Seconds()) + + return queue +} + +// Acquire takes one slot from the Queue +// and returns when a request should be processed +// it allows up to (limit) of requests running at a time +// it allows to queue up to (queue-limit) requests +func (s *Queue) Acquire() (err error) { + // push item to a queue to claim your own slot (non-blocking) + select { + case s.waitingCh <- time.Now(): + s.queueingWaiting.Inc() + break + default: + s.queueingErrors.WithLabelValues("too_many_requests").Inc() + return ErrTooManyRequests + } + + defer func() { + if err != nil { + waitStarted := <-s.waitingCh + s.queueingWaiting.Dec() + s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds())) + } + }() + + // fast path: push item to current processed items (non-blocking) + select { + case s.busyCh <- struct{}{}: + s.queueingBusy.Inc() + return nil + default: + break + } + + timer := time.NewTimer(s.timeout) + defer timer.Stop() + + // push item to current processed items (blocking) + select { + case s.busyCh <- struct{}{}: + s.queueingBusy.Inc() + return nil + + case <-timer.C: + s.queueingErrors.WithLabelValues("queueing_timedout").Inc() + return ErrQueueingTimedout + } +} + +// Release marks the finish of processing of requests +// It triggers next request to be processed if it's in queue +func (s *Queue) Release() { + // dequeue from queue to allow next request to be processed + waitStarted := <-s.waitingCh + s.queueingWaiting.Dec() + s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds())) + + <-s.busyCh + s.queueingBusy.Dec() +} diff --git a/workhorse/internal/queueing/queue_test.go b/workhorse/internal/queueing/queue_test.go new file mode 100644 index 00000000000..7f5ed9154f4 --- /dev/null +++ b/workhorse/internal/queueing/queue_test.go @@ -0,0 +1,62 @@ +package queueing + +import ( + "testing" + "time" +) + +func TestNormalQueueing(t *testing.T) { + q := newQueue("queue 1", 2, 1, time.Microsecond) + err1 := q.Acquire() + if err1 != nil { + t.Fatal("we should acquire a new slot") + } + + err2 := q.Acquire() + if err2 != nil { + t.Fatal("we should acquire a new slot") + } + + err3 := q.Acquire() + if err3 != ErrQueueingTimedout { + t.Fatal("we should timeout") + } + + q.Release() + + err4 := q.Acquire() + if err4 != nil { + t.Fatal("we should acquire a new slot") + } +} + +func TestQueueLimit(t *testing.T) { + q := newQueue("queue 2", 1, 0, time.Microsecond) + err1 := q.Acquire() + if err1 != nil { + t.Fatal("we should acquire a new slot") + } + + err2 := q.Acquire() + if err2 != ErrTooManyRequests { + t.Fatal("we should fail because of not enough slots in queue") + } +} + +func TestQueueProcessing(t *testing.T) { + q := newQueue("queue 3", 1, 1, time.Second) + err1 := q.Acquire() + if err1 != nil { + t.Fatal("we should acquire a new slot") + } + + go func() { + time.Sleep(50 * time.Microsecond) + q.Release() + }() + + err2 := q.Acquire() + if err2 != nil { + t.Fatal("we should acquire slot after the previous one finished") + } +} diff --git a/workhorse/internal/queueing/requests.go b/workhorse/internal/queueing/requests.go new file mode 100644 index 00000000000..409a7656fa4 --- /dev/null +++ b/workhorse/internal/queueing/requests.go @@ -0,0 +1,51 @@ +package queueing + +import ( + "net/http" + "time" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" +) + +const ( + DefaultTimeout = 30 * time.Second + httpStatusTooManyRequests = 429 +) + +// QueueRequests creates a new request queue +// name specifies the name of queue, used to label Prometheus metrics +// Don't call QueueRequests twice with the same name argument! +// h specifies a http.Handler which will handle the queue requests +// limit specifies number of requests run concurrently +// queueLimit specifies maximum number of requests that can be queued +// queueTimeout specifies the time limit of storing the request in the queue +func QueueRequests(name string, h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler { + if limit == 0 { + return h + } + if queueTimeout == 0 { + queueTimeout = DefaultTimeout + } + + queue := newQueue(name, limit, queueLimit, queueTimeout) + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := queue.Acquire() + + switch err { + case nil: + defer queue.Release() + h.ServeHTTP(w, r) + + case ErrTooManyRequests: + http.Error(w, "Too Many Requests", httpStatusTooManyRequests) + + case ErrQueueingTimedout: + http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) + + default: + helper.Fail500(w, r, err) + } + + }) +} diff --git a/workhorse/internal/queueing/requests_test.go b/workhorse/internal/queueing/requests_test.go new file mode 100644 index 00000000000..f1c52e5c6f5 --- /dev/null +++ b/workhorse/internal/queueing/requests_test.go @@ -0,0 +1,76 @@ +package queueing + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +var httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "OK") +}) + +func pausedHttpHandler(pauseCh chan struct{}) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-pauseCh + fmt.Fprintln(w, "OK") + }) +} + +func TestNormalRequestProcessing(t *testing.T) { + w := httptest.NewRecorder() + h := QueueRequests("Normal request processing", httpHandler, 1, 1, time.Second) + h.ServeHTTP(w, nil) + if w.Code != 200 { + t.Fatal("QueueRequests should process request") + } +} + +// testSlowRequestProcessing creates a new queue, +// then it runs a number of requests that are going through queue, +// we return the response of first finished request, +// where status of request can be 200, 429 or 503 +func testSlowRequestProcessing(name string, count int, limit, queueLimit uint, queueTimeout time.Duration) *httptest.ResponseRecorder { + pauseCh := make(chan struct{}) + defer close(pauseCh) + + handler := QueueRequests("Slow request processing: "+name, pausedHttpHandler(pauseCh), limit, queueLimit, queueTimeout) + + respCh := make(chan *httptest.ResponseRecorder, count) + + // queue requests to use up the queue + for i := 0; i < count; i++ { + go func() { + w := httptest.NewRecorder() + handler.ServeHTTP(w, nil) + respCh <- w + }() + } + + // dequeue first request + return <-respCh +} + +// TestQueueingTimeout performs 2 requests +// the queue limit and length is 1, +// the second request gets timed-out +func TestQueueingTimeout(t *testing.T) { + w := testSlowRequestProcessing("timeout", 2, 1, 1, time.Microsecond) + + if w.Code != 503 { + t.Fatal("QueueRequests should timeout queued request") + } +} + +// TestQueueingTooManyRequests performs 3 requests +// the queue limit and length is 1, +// so the third request has to be rejected with 429 +func TestQueueingTooManyRequests(t *testing.T) { + w := testSlowRequestProcessing("too many requests", 3, 1, 1, time.Minute) + + if w.Code != 429 { + t.Fatal("QueueRequests should return immediately and return too many requests") + } +} |