diff options
Diffstat (limited to 'workhorse/internal/builds')
-rw-r--r-- | workhorse/internal/builds/register.go | 163 | ||||
-rw-r--r-- | workhorse/internal/builds/register_test.go | 108 |
2 files changed, 271 insertions, 0 deletions
diff --git a/workhorse/internal/builds/register.go b/workhorse/internal/builds/register.go new file mode 100644 index 00000000000..77685889cfd --- /dev/null +++ b/workhorse/internal/builds/register.go @@ -0,0 +1,163 @@ +package builds + +import ( + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/redis" +) + +const ( + maxRegisterBodySize = 32 * 1024 + runnerBuildQueue = "runner:build_queue:" + runnerBuildQueueHeaderKey = "Gitlab-Ci-Builds-Polling" + runnerBuildQueueHeaderValue = "yes" +) + +var ( + registerHandlerRequests = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_builds_register_handler_requests", + Help: "Describes how many requests in different states hit a register handler", + }, + []string{"status"}, + ) + registerHandlerOpen = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gitlab_workhorse_builds_register_handler_open", + Help: "Describes how many requests is currently open in given state", + }, + []string{"state"}, + ) + + registerHandlerOpenAtReading = registerHandlerOpen.WithLabelValues("reading") + registerHandlerOpenAtProxying = registerHandlerOpen.WithLabelValues("proxying") + registerHandlerOpenAtWatching = registerHandlerOpen.WithLabelValues("watching") + + registerHandlerBodyReadErrors = registerHandlerRequests.WithLabelValues("body-read-error") + registerHandlerBodyParseErrors = registerHandlerRequests.WithLabelValues("body-parse-error") + registerHandlerMissingValues = registerHandlerRequests.WithLabelValues("missing-values") + registerHandlerWatchErrors = registerHandlerRequests.WithLabelValues("watch-error") + registerHandlerAlreadyChangedRequests = registerHandlerRequests.WithLabelValues("already-changed") + registerHandlerSeenChangeRequests = registerHandlerRequests.WithLabelValues("seen-change") + registerHandlerTimeoutRequests = registerHandlerRequests.WithLabelValues("timeout") + registerHandlerNoChangeRequests = registerHandlerRequests.WithLabelValues("no-change") +) + +type largeBodyError struct{ error } + +type WatchKeyHandler func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) + +type runnerRequest struct { + Token string `json:"token,omitempty"` + LastUpdate string `json:"last_update,omitempty"` +} + +func readRunnerBody(w http.ResponseWriter, r *http.Request) ([]byte, error) { + registerHandlerOpenAtReading.Inc() + defer registerHandlerOpenAtReading.Dec() + + return helper.ReadRequestBody(w, r, maxRegisterBodySize) +} + +func readRunnerRequest(r *http.Request, body []byte) (*runnerRequest, error) { + if !helper.IsApplicationJson(r) { + return nil, errors.New("invalid content-type received") + } + + var runnerRequest runnerRequest + err := json.Unmarshal(body, &runnerRequest) + if err != nil { + return nil, err + } + + return &runnerRequest, nil +} + +func proxyRegisterRequest(h http.Handler, w http.ResponseWriter, r *http.Request) { + registerHandlerOpenAtProxying.Inc() + defer registerHandlerOpenAtProxying.Dec() + + h.ServeHTTP(w, r) +} + +func watchForRunnerChange(watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) { + registerHandlerOpenAtWatching.Inc() + defer registerHandlerOpenAtWatching.Dec() + + return watchHandler(runnerBuildQueue+token, lastUpdate, duration) +} + +func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDuration time.Duration) http.Handler { + if pollingDuration == 0 { + return h + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(runnerBuildQueueHeaderKey, runnerBuildQueueHeaderValue) + + requestBody, err := readRunnerBody(w, r) + if err != nil { + registerHandlerBodyReadErrors.Inc() + helper.RequestEntityTooLarge(w, r, &largeBodyError{err}) + return + } + + newRequest := helper.CloneRequestWithNewBody(r, requestBody) + + runnerRequest, err := readRunnerRequest(r, requestBody) + if err != nil { + registerHandlerBodyParseErrors.Inc() + proxyRegisterRequest(h, w, newRequest) + return + } + + if runnerRequest.Token == "" || runnerRequest.LastUpdate == "" { + registerHandlerMissingValues.Inc() + proxyRegisterRequest(h, w, newRequest) + return + } + + result, err := watchForRunnerChange(watchHandler, runnerRequest.Token, + runnerRequest.LastUpdate, pollingDuration) + if err != nil { + registerHandlerWatchErrors.Inc() + proxyRegisterRequest(h, w, newRequest) + return + } + + switch result { + // It means that we detected a change before starting watching on change, + // We proxy request to Rails, to see whether we have a build to receive + case redis.WatchKeyStatusAlreadyChanged: + registerHandlerAlreadyChangedRequests.Inc() + proxyRegisterRequest(h, w, newRequest) + + // It means that we detected a change after watching. + // We could potentially proxy request to Rails, but... + // We can end-up with unreliable responses, + // as don't really know whether ResponseWriter is still in a sane state, + // for example the connection is dead + case redis.WatchKeyStatusSeenChange: + registerHandlerSeenChangeRequests.Inc() + w.WriteHeader(http.StatusNoContent) + + // When we receive one of these statuses, it means that we detected no change, + // so we return to runner 204, which means nothing got changed, + // and there's no new builds to process + case redis.WatchKeyStatusTimeout: + registerHandlerTimeoutRequests.Inc() + w.WriteHeader(http.StatusNoContent) + + case redis.WatchKeyStatusNoChange: + registerHandlerNoChangeRequests.Inc() + w.WriteHeader(http.StatusNoContent) + } + }) +} diff --git a/workhorse/internal/builds/register_test.go b/workhorse/internal/builds/register_test.go new file mode 100644 index 00000000000..a72d82dc2b7 --- /dev/null +++ b/workhorse/internal/builds/register_test.go @@ -0,0 +1,108 @@ +package builds + +import ( + "bytes" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/redis" +) + +const upstreamResponseCode = 999 + +func echoRequest(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(upstreamResponseCode) + io.Copy(rw, req.Body) +} + +var echoRequestFunc = http.HandlerFunc(echoRequest) + +func expectHandlerWithWatcher(t *testing.T, watchHandler WatchKeyHandler, data string, contentType string, expectedHttpStatus int, msgAndArgs ...interface{}) { + h := RegisterHandler(echoRequestFunc, watchHandler, time.Second) + + rw := httptest.NewRecorder() + req, _ := http.NewRequest("POST", "/", bytes.NewBufferString(data)) + req.Header.Set("Content-Type", contentType) + + h.ServeHTTP(rw, req) + + require.Equal(t, expectedHttpStatus, rw.Code, msgAndArgs...) +} + +func expectHandler(t *testing.T, data string, contentType string, expectedHttpStatus int, msgAndArgs ...interface{}) { + expectHandlerWithWatcher(t, nil, data, contentType, expectedHttpStatus, msgAndArgs...) +} + +func TestRegisterHandlerLargeBody(t *testing.T) { + data := strings.Repeat(".", maxRegisterBodySize+5) + expectHandler(t, data, "application/json", http.StatusRequestEntityTooLarge, + "rejects body with entity too large") +} + +func TestRegisterHandlerInvalidRunnerRequest(t *testing.T) { + expectHandler(t, "invalid content", "text/plain", upstreamResponseCode, + "proxies request to upstream") +} + +func TestRegisterHandlerInvalidJsonPayload(t *testing.T) { + expectHandler(t, `{[`, "application/json", upstreamResponseCode, + "fails on parsing body and proxies request to upstream") +} + +func TestRegisterHandlerMissingData(t *testing.T) { + testCases := []string{ + `{"token":"token"}`, + `{"last_update":"data"}`, + } + + for _, testCase := range testCases { + expectHandler(t, testCase, "application/json", upstreamResponseCode, + "fails on argument validation and proxies request to upstream") + } +} + +func expectWatcherToBeExecuted(t *testing.T, watchKeyStatus redis.WatchKeyStatus, watchKeyError error, + httpStatus int, msgAndArgs ...interface{}) { + executed := false + watchKeyHandler := func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) { + executed = true + return watchKeyStatus, watchKeyError + } + + parsableData := `{"token":"token","last_update":"last_update"}` + + expectHandlerWithWatcher(t, watchKeyHandler, parsableData, "application/json", httpStatus, msgAndArgs...) + require.True(t, executed, msgAndArgs...) +} + +func TestRegisterHandlerWatcherError(t *testing.T) { + expectWatcherToBeExecuted(t, redis.WatchKeyStatusNoChange, errors.New("redis connection"), + upstreamResponseCode, "proxies data to upstream") +} + +func TestRegisterHandlerWatcherAlreadyChanged(t *testing.T) { + expectWatcherToBeExecuted(t, redis.WatchKeyStatusAlreadyChanged, nil, + upstreamResponseCode, "proxies data to upstream") +} + +func TestRegisterHandlerWatcherSeenChange(t *testing.T) { + expectWatcherToBeExecuted(t, redis.WatchKeyStatusSeenChange, nil, + http.StatusNoContent) +} + +func TestRegisterHandlerWatcherTimeout(t *testing.T) { + expectWatcherToBeExecuted(t, redis.WatchKeyStatusTimeout, nil, + http.StatusNoContent) +} + +func TestRegisterHandlerWatcherNoChange(t *testing.T) { + expectWatcherToBeExecuted(t, redis.WatchKeyStatusNoChange, nil, + http.StatusNoContent) +} |