diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-10-02 00:11:37 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-10-02 00:11:37 +0300 |
commit | fd7da8784c9b0f79dc5afeb83bf2313c2cdc1ffc (patch) | |
tree | 5726a2f9649d04b273b5d4e00421c87e0375216c /workhorse/internal/upstream | |
parent | 0f295cd16f516ec10e6cd0b3fa5846563c08d9b8 (diff) |
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse/internal/upstream')
-rw-r--r-- | workhorse/internal/upstream/upstream.go | 18 | ||||
-rw-r--r-- | workhorse/internal/upstream/upstream_test.go | 99 |
2 files changed, 87 insertions, 30 deletions
diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go index d6e5e7766b5..e57f58d59dd 100644 --- a/workhorse/internal/upstream/upstream.go +++ b/workhorse/internal/upstream/upstream.go @@ -50,7 +50,7 @@ type upstream struct { geoLocalRoutes []routeEntry geoProxyCableRoute routeEntry geoProxyRoute routeEntry - geoProxyTestChannel chan struct{} + geoProxyPollSleep func(time.Duration) accessLogger *logrus.Logger enableGeoProxyFeature bool mu sync.RWMutex @@ -68,6 +68,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") == "1", geoProxyBackend: &url.URL{}, } + if up.geoProxyPollSleep == nil { + up.geoProxyPollSleep = time.Sleep + } if up.Backend == nil { up.Backend = DefaultBackend } @@ -205,13 +208,7 @@ func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *route func (u *upstream) pollGeoProxyAPI() { for { u.callGeoProxyAPI() - - // Notify tests when callGeoProxyAPI() finishes - if u.geoProxyTestChannel != nil { - u.geoProxyTestChannel <- struct{}{} - } - - time.Sleep(geoProxyApiPollingInterval) + u.geoProxyPollSleep(geoProxyApiPollingInterval) } } @@ -234,6 +231,11 @@ func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) { defer u.mu.Unlock() u.geoProxyBackend = geoProxyURL + + if u.geoProxyBackend.String() == "" { + return + } + geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode) geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper) u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream) diff --git a/workhorse/internal/upstream/upstream_test.go b/workhorse/internal/upstream/upstream_test.go index c86c03920f0..efc85dd6c2e 100644 --- a/workhorse/internal/upstream/upstream_test.go +++ b/workhorse/internal/upstream/upstream_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -71,10 +72,10 @@ func TestGeoProxyFeatureDisabledOnGeoSecondarySite(t *testing.T) { defer rsDeferredClose() geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, false) defer wsDeferredClose() testCases := []testCase{ @@ -91,10 +92,10 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) { defer rsDeferredClose() geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true) defer wsDeferredClose() testCases := []testCase{ @@ -109,10 +110,10 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) { // This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) { geoProxyEndpointResponseBody := "{}" - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, false) defer wsDeferredClose() testCases := []testCase{ @@ -126,10 +127,10 @@ func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) { func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) { geoProxyEndpointResponseBody := "{}" - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true) defer wsDeferredClose() testCases := []testCase{ @@ -143,10 +144,10 @@ func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) { func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) { geoProxyEndpointResponseBody := "Invalid response" - railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) defer deferredClose() - ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true) defer wsDeferredClose() testCases := []testCase{ @@ -158,6 +159,49 @@ func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) { runTestCases(t, ws, testCases) } +func TestGeoProxyFeatureEnablingAndDisabling(t *testing.T) { + remoteServer, rsDeferredClose := startRemoteServer("Geo primary") + defer rsDeferredClose() + + geoProxyEndpointEnabledResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) + geoProxyEndpointDisabledResponseBody := "{}" + geoProxyEndpointResponseBody := geoProxyEndpointEnabledResponseBody + + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) + defer deferredClose() + + ws, wsDeferredClose, waitForNextApiPoll := startWorkhorseServer(railsServer.URL, true) + defer wsDeferredClose() + + testCasesLocal := []testCase{ + {"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"}, + } + + testCasesProxied := []testCase{ + {"jobs request is forwarded", "/api/v4/jobs/request", "Geo primary received request to path /api/v4/jobs/request"}, + {"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"}, + {"unknown route is forwarded", "/anything", "Geo primary received request to path /anything"}, + } + + // Enabled initially, run tests + runTestCases(t, ws, testCasesProxied) + + // Disable proxying and run tests. It's safe to write to + // geoProxyEndpointResponseBody because the polling goroutine is blocked. + geoProxyEndpointResponseBody = geoProxyEndpointDisabledResponseBody + waitForNextApiPoll() + + runTestCases(t, ws, testCasesLocal) + + // Re-enable proxying and run tests + geoProxyEndpointResponseBody = geoProxyEndpointEnabledResponseBody + waitForNextApiPoll() + + runTestCases(t, ws, testCasesProxied) +} + func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) { t.Helper() for _, tc := range testCases { @@ -195,13 +239,13 @@ func startRemoteServer(serverName string) (*httptest.Server, func()) { return ts, ts.Close } -func startRailsServer(railsServerName string, geoProxyEndpointResponseBody string) (*httptest.Server, func()) { +func startRailsServer(railsServerName string, geoProxyEndpointResponseBody *string) (*httptest.Server, func()) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var body string if r.URL.Path == geoProxyEndpoint { w.Header().Set("Content-Type", "application/vnd.gitlab-workhorse+json") - body = geoProxyEndpointResponseBody + body = *geoProxyEndpointResponseBody } else { body = railsServerName + " received request to path " + r.URL.Path } @@ -213,15 +257,19 @@ func startRailsServer(railsServerName string, geoProxyEndpointResponseBody strin return ts, ts.Close } -func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func()) { - geoProxyTestChannel := make(chan struct{}) +func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func(), func()) { + geoProxySleepC := make(chan struct{}) + geoProxySleep := func(time.Duration) { + geoProxySleepC <- struct{}{} + <-geoProxySleepC + } myConfigureRoutes := func(u *upstream) { // Enable environment variable "feature flag" u.enableGeoProxyFeature = enableGeoProxyFeature - // An empty message will be sent to this channel after every callGeoProxyAPI() - u.geoProxyTestChannel = geoProxyTestChannel + // Replace the time.Sleep function with geoProxySleep + u.geoProxyPollSleep = geoProxySleep // call original configureRoutes(u) @@ -231,13 +279,20 @@ func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*h ws := httptest.NewServer(upstreamHandler) testhelper.ConfigureSecret() + waitForNextApiPoll := func() {} + if enableGeoProxyFeature { - // Wait for an empty message from callGeoProxyAPI(). This should be done on - // all tests where enableGeoProxyFeature is true, including the ones where - // we expect geoProxyURL to be nil or error, to ensure the tests do not pass - // by coincidence. - <-geoProxyTestChannel + // Wait for geoProxySleep to be entered for the first time + <-geoProxySleepC + + waitForNextApiPoll = func() { + // Cause geoProxySleep to return + geoProxySleepC <- struct{}{} + + // Wait for geoProxySleep to be entered again + <-geoProxySleepC + } } - return ws, ws.Close + return ws, ws.Close, waitForNextApiPoll } |