diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-07-23 15:09:05 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-07-23 15:09:05 +0300 |
commit | ab8eecd62cc11a31568b25304f5fd31c8b7f437f (patch) | |
tree | b73b765c3cea414112840fd8041c62f886d8ce53 /workhorse | |
parent | 00a889ea7a115ebbda95a071dd630f93b79261e3 (diff) |
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r-- | workhorse/internal/log/logging.go | 8 | ||||
-rw-r--r-- | workhorse/internal/log/logging_test.go | 11 | ||||
-rw-r--r-- | workhorse/internal/upstream/routes.go | 36 | ||||
-rw-r--r-- | workhorse/internal/upstream/upstream.go | 90 | ||||
-rw-r--r-- | workhorse/internal/upstream/upstream_test.go | 179 |
5 files changed, 298 insertions, 26 deletions
diff --git a/workhorse/internal/log/logging.go b/workhorse/internal/log/logging.go index 80c09c1bf02..ae7164db920 100644 --- a/workhorse/internal/log/logging.go +++ b/workhorse/internal/log/logging.go @@ -67,6 +67,14 @@ func (b *Builder) WithError(err error) *Builder { return b } +func Debug(args ...interface{}) { + NewBuilder().Debug(args...) +} + +func (b *Builder) Debug(args ...interface{}) { + b.entry.Debug(args...) +} + func Info(args ...interface{}) { NewBuilder().Info(args...) } diff --git a/workhorse/internal/log/logging_test.go b/workhorse/internal/log/logging_test.go index 1cb6438532e..9daf282daf4 100644 --- a/workhorse/internal/log/logging_test.go +++ b/workhorse/internal/log/logging_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) @@ -14,6 +15,7 @@ func captureLogs(b *Builder, testFn func()) string { buf := &bytes.Buffer{} logger := b.entry.Logger + logger.SetLevel(logrus.DebugLevel) oldOut := logger.Out logger.Out = buf defer func() { @@ -25,6 +27,15 @@ func captureLogs(b *Builder, testFn func()) string { return buf.String() } +func TestLogDebug(t *testing.T) { + b := NewBuilder() + logLine := captureLogs(b, func() { + b.Debug("an observation") + }) + + require.Regexp(t, `level=debug msg="an observation"`, logLine) +} + func TestLogInfo(t *testing.T) { b := NewBuilder() logLine := captureLogs(b, func() { diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go index 241ffe89eee..a4b453f047d 100644 --- a/workhorse/internal/upstream/routes.go +++ b/workhorse/internal/upstream/routes.go @@ -337,6 +337,42 @@ func configureRoutes(u *upstream) { u.route("", "", defaultUpstream), } + + // Routes which should actually be served locally by a Geo Proxy. If none + // matches, then then proxy the request. + u.geoLocalRoutes = []routeEntry{ + // Git and LFS requests + // + // Note that Geo already redirects pushes, with special terminal output. + // Note that excessive secondary lag can cause unexpected behavior since + // pulls are performed against a different source of truth. Ideally, we'd + // proxy/redirect pulls as well, when the secondary is not up-to-date. + // + u.route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)), + u.route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))), + u.route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))), + u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))), + + // Serve health checks from this Geo secondary + u.route("", "^/-/(readiness|liveness)$", static.DeployPage(probeUpstream)), + u.route("", "^/-/health$", static.DeployPage(healthUpstream)), + u.route("", "^/-/metrics$", defaultUpstream), + + // Authentication routes + u.route("", "^/users/(sign_in|sign_out)$", defaultUpstream), + u.route("", "^/oauth/geo/(auth|callback|logout)$", defaultUpstream), + + // Admin Area > Geo routes + u.route("", "^/admin/geo$", defaultUpstream), + u.route("", "^/admin/geo/", defaultUpstream), + + // Geo API routes + u.route("", "^/api/v4/geo_nodes", defaultUpstream), + u.route("", "^/api/v4/geo_replication", defaultUpstream), + + // Don't define a catch-all route. If a route does not match, then we know + // the request should be proxied. + } } func createUploadPreparers(cfg config.Config) uploadPreparers { diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go index dd9ee9b5dce..0b46228a0a8 100644 --- a/workhorse/internal/upstream/upstream.go +++ b/workhorse/internal/upstream/upstream.go @@ -9,8 +9,10 @@ package upstream import ( "fmt" "os" + "sync" "net/http" + "net/url" "strings" "github.com/sirupsen/logrus" @@ -21,6 +23,7 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" + proxypkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy" "gitlab.com/gitlab-org/gitlab/workhorse/internal/rejectmethods" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper" @@ -41,8 +44,13 @@ type upstream struct { RoundTripper http.RoundTripper CableRoundTripper http.RoundTripper APIClient *apipkg.API + geoProxyBackend *url.URL + geoLocalRoutes []routeEntry + geoProxyCableRoute routeEntry + geoProxyRoute routeEntry accessLogger *logrus.Logger enableGeoProxyFeature bool + mu sync.RWMutex } func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler { @@ -119,25 +127,9 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - // Look for a matching route - var route *routeEntry + cleanedPath := prefix.Strip(URIPath) - if u.enableGeoProxyFeature { - geoProxyURL, err := u.APIClient.GetGeoProxyURL() - - if err == nil { - log.WithRequest(r).WithFields(log.Fields{"geoProxyURL": geoProxyURL}).Info("Geo Proxy: Set route according to Geo Proxy logic") - } else if err != apipkg.ErrNotGeoSecondary { - log.WithRequest(r).WithError(err).Error("Geo Proxy: Unable to determine Geo Proxy URL. Falling back to normal routing") - } - } - - for _, ro := range u.Routes { - if ro.isMatch(prefix.Strip(URIPath), r) { - route = &ro - break - } - } + route := u.findRoute(cleanedPath, r) if route == nil { // The protocol spec in git/Documentation/technical/http-protocol.txt @@ -152,3 +144,65 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { route.handler.ServeHTTP(w, r) } + +func (u *upstream) findRoute(cleanedPath string, r *http.Request) *routeEntry { + if u.enableGeoProxyFeature { + if route := u.findGeoProxyRoute(cleanedPath, r); route != nil { + return route + } + } + + for _, ro := range u.Routes { + if ro.isMatch(cleanedPath, r) { + return &ro + } + } + + return nil +} + +func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry { + geoProxyURL, err := u.APIClient.GetGeoProxyURL() + + if err == nil { + u.setGeoProxyRoutes(geoProxyURL) + return u.matchGeoProxyRoute(cleanedPath, r) + } else if err != apipkg.ErrNotGeoSecondary { + log.WithRequest(r).WithError(err).Error("Geo Proxy: Unable to determine Geo Proxy URL. Falling back to normal routing") + } + + return nil +} + +func (u *upstream) matchGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry { + // Some routes are safe to serve from this GitLab instance + for _, ro := range u.geoLocalRoutes { + if ro.isMatch(cleanedPath, r) { + log.WithRequest(r).Debug("Geo Proxy: Handle this request locally") + return &ro + } + } + + log.WithRequest(r).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Debug("Geo Proxy: Forward this request") + + u.mu.RLock() + defer u.mu.RUnlock() + if cleanedPath == "/-/cable" { + return &u.geoProxyCableRoute + } + + return &u.geoProxyRoute +} + +func (u *upstream) setGeoProxyRoutes(geoProxyURL *url.URL) { + u.mu.Lock() + defer u.mu.Unlock() + if u.geoProxyBackend == nil || u.geoProxyBackend.String() != geoProxyURL.String() { + log.WithFields(log.Fields{"geoProxyURL": geoProxyURL}).Debug("Geo Proxy: Update GeoProxyRoute") + u.geoProxyBackend = geoProxyURL + geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode) + geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper) + u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream) + u.geoProxyRoute = u.route("", "", geoProxyUpstream) + } +} diff --git a/workhorse/internal/upstream/upstream_test.go b/workhorse/internal/upstream/upstream_test.go index adc881474f5..a3dcc380d64 100644 --- a/workhorse/internal/upstream/upstream_test.go +++ b/workhorse/internal/upstream/upstream_test.go @@ -1,6 +1,7 @@ package upstream import ( + "fmt" "io" "io/ioutil" "net/http" @@ -11,8 +12,21 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" ) +const ( + geoProxyEndpoint = "/api/v4/geo/proxy" + testDocumentRoot = "testdata/public" +) + +type testCase struct { + desc string + path string + expectedResponse string +} + func TestRouting(t *testing.T) { handle := func(u *upstream, regex string) routeEntry { handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { @@ -34,15 +48,10 @@ func TestRouting(t *testing.T) { handle(u, main), } }) - ts := httptest.NewServer(u) defer ts.Close() - testCases := []struct { - desc string - path string - route string - }{ + testCases := []testCase{ {"main route works", "/", main}, {"foobar route works", "/foobar", foobar}, {"quxbaz route works", "/quxbaz", quxbaz}, @@ -51,9 +60,109 @@ func TestRouting(t *testing.T) { {"double escaped path traversal does not match any route", "/foobar%252f%252e%252e%252fquxbaz", main}, } + runTestCases(t, ts, testCases) +} + +// This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed +func TestGeoProxyFeatureDisabledOnGeoSecondarySite(t *testing.T) { + // We could just not set up the primary, but then we'd have to assert + // that the internal API call isn't made. This is easier. + remoteServer, rsDeferredClose := startRemoteServer("Geo primary") + defer rsDeferredClose() + + geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) + railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + defer deferredClose() + + ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false) + defer wsDeferredClose() + + testCases := []testCase{ + {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, + } + + runTestCases(t, ws, testCases) +} + +func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) { + remoteServer, rsDeferredClose := startRemoteServer("Geo primary") + defer rsDeferredClose() + + geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) + railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + defer deferredClose() + + ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + defer wsDeferredClose() + + testCases := []testCase{ + {"jobs request is forwarded", "/api/v4/jobs/request", "Geo primary received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is forwarded", "/anything", "Geo primary received request to path /anything"}, + } + + runTestCases(t, ws, testCases) +} + +// This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed +func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) { + geoProxyEndpointResponseBody := "{}" + railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + defer deferredClose() + + ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false) + defer wsDeferredClose() + + testCases := []testCase{ + {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, + } + + runTestCases(t, ws, testCases) +} + +func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) { + geoProxyEndpointResponseBody := "{}" + railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + defer deferredClose() + + ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + defer wsDeferredClose() + + testCases := []testCase{ + {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, + } + + runTestCases(t, ws, testCases) +} + +func TestGeoProxyWithAPIError(t *testing.T) { + geoProxyEndpointResponseBody := "Invalid response" + railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + defer deferredClose() + + ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + defer wsDeferredClose() + + testCases := []testCase{ + {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, + } + + runTestCases(t, ws, testCases) +} + +func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) { + t.Helper() for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - resp, err := http.Get(ts.URL + tc.path) + resp, err := http.Get(ws.URL + tc.path) require.NoError(t, err) defer resp.Body.Close() @@ -61,7 +170,61 @@ func TestRouting(t *testing.T) { require.NoError(t, err) require.Equal(t, 200, resp.StatusCode, "response code") - require.Equal(t, tc.route, string(body)) + require.Equal(t, tc.expectedResponse, string(body)) }) } } + +func newUpstreamConfig(authBackend string) *config.Config { + return &config.Config{ + Version: "123", + DocumentRoot: testDocumentRoot, + Backend: helper.URLMustParse(authBackend), + ImageResizerConfig: config.DefaultImageResizerConfig, + } +} + +func startRemoteServer(serverName string) (*httptest.Server, func()) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body := serverName + " received request to path " + r.URL.Path + + w.WriteHeader(200) + fmt.Fprint(w, body) + })) + + return ts, ts.Close +} + +func startRailsServer(railsServerName string, geoProxyEndpointResponseBody string) (*httptest.Server, func()) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body string + + if r.URL.Path == geoProxyEndpoint { + w.Header().Set("Content-Type", "application/vnd.gitlab-workhorse+json") + body = geoProxyEndpointResponseBody + } else { + body = railsServerName + " received request to path " + r.URL.Path + } + + w.WriteHeader(200) + fmt.Fprint(w, body) + })) + + return ts, ts.Close +} + +func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func()) { + myConfigureRoutes := func(u *upstream) { + // Enable environment variable "feature flag" + u.enableGeoProxyFeature = enableGeoProxyFeature + + // call original + configureRoutes(u) + } + cfg := newUpstreamConfig(railsServerURL) + upstreamHandler := newUpstream(*cfg, logrus.StandardLogger(), myConfigureRoutes) + ws := httptest.NewServer(upstreamHandler) + testhelper.ConfigureSecret() + + return ws, ws.Close +} |