diff options
Diffstat (limited to 'workhorse/internal')
-rw-r--r-- | workhorse/internal/builds/register.go | 9 | ||||
-rw-r--r-- | workhorse/internal/builds/register_test.go | 3 | ||||
-rw-r--r-- | workhorse/internal/redis/keywatcher.go | 3 | ||||
-rw-r--r-- | workhorse/internal/redis/keywatcher_test.go | 13 | ||||
-rw-r--r-- | workhorse/internal/upstream/routes.go | 4 | ||||
-rw-r--r-- | workhorse/internal/upstream/routes_test.go | 11 | ||||
-rw-r--r-- | workhorse/internal/upstream/upstream_test.go | 38 |
7 files changed, 70 insertions, 11 deletions
diff --git a/workhorse/internal/builds/register.go b/workhorse/internal/builds/register.go index 0a2fe47ed7e..f45d03ab8ed 100644 --- a/workhorse/internal/builds/register.go +++ b/workhorse/internal/builds/register.go @@ -2,6 +2,7 @@ package builds import ( "bytes" + "context" "encoding/json" "errors" "io" @@ -55,7 +56,7 @@ var ( type largeBodyError struct{ error } -type WatchKeyHandler func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) +type WatchKeyHandler func(ctx context.Context, key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) type runnerRequest struct { Token string `json:"token,omitempty"` @@ -102,11 +103,11 @@ func proxyRegisterRequest(h http.Handler, w http.ResponseWriter, r *http.Request h.ServeHTTP(w, r) } -func watchForRunnerChange(watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) { +func watchForRunnerChange(ctx context.Context, watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) { registerHandlerOpenAtWatching.Inc() defer registerHandlerOpenAtWatching.Dec() - return watchHandler(runnerBuildQueue+token, lastUpdate, duration) + return watchHandler(ctx, runnerBuildQueue+token, lastUpdate, duration) } func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDuration time.Duration) http.Handler { @@ -140,7 +141,7 @@ func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDurati return } - result, err := watchForRunnerChange(watchHandler, runnerRequest.Token, + result, err := watchForRunnerChange(r.Context(), watchHandler, runnerRequest.Token, runnerRequest.LastUpdate, pollingDuration) if err != nil { registerHandlerWatchErrors.Inc() diff --git a/workhorse/internal/builds/register_test.go b/workhorse/internal/builds/register_test.go index d5cbebd500b..97d66517ac9 100644 --- a/workhorse/internal/builds/register_test.go +++ b/workhorse/internal/builds/register_test.go @@ -2,6 +2,7 @@ package builds import ( "bytes" + "context" "errors" "io" "net/http" @@ -71,7 +72,7 @@ func TestRegisterHandlerMissingData(t *testing.T) { func expectWatcherToBeExecuted(t *testing.T, watchKeyStatus redis.WatchKeyStatus, watchKeyError error, httpStatus int, msgAndArgs ...interface{}) { executed := false - watchKeyHandler := func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) { + watchKeyHandler := func(ctx context.Context, key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) { executed = true return watchKeyStatus, watchKeyError } diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go index cdf6ccd7e83..2fd0753c3c9 100644 --- a/workhorse/internal/redis/keywatcher.go +++ b/workhorse/internal/redis/keywatcher.go @@ -1,6 +1,7 @@ package redis import ( + "context" "errors" "fmt" "strings" @@ -251,7 +252,7 @@ const ( WatchKeyStatusNoChange ) -func (kw *KeyWatcher) WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) { +func (kw *KeyWatcher) WatchKey(_ context.Context, key, value string, timeout time.Duration) (WatchKeyStatus, error) { notify := make(chan string, 1) if err := kw.addSubscription(key, notify); err != nil { return WatchKeyStatusNoChange, err diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go index bae49d81bb1..3abc1bf1107 100644 --- a/workhorse/internal/redis/keywatcher_test.go +++ b/workhorse/internal/redis/keywatcher_test.go @@ -1,6 +1,7 @@ package redis import ( + "context" "sync" "testing" "time" @@ -10,6 +11,8 @@ import ( "github.com/stretchr/testify/require" ) +var ctx = context.Background() + const ( runnerKey = "runner:build_queue:10" ) @@ -131,7 +134,7 @@ func TestKeyChangesInstantReturn(t *testing.T) { defer kw.Shutdown() kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} - val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -187,7 +190,7 @@ func TestKeyChangesWhenWatching(t *testing.T) { go func() { defer wg.Done() <-ready - val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -245,7 +248,7 @@ func TestKeyChangesParallel(t *testing.T) { go func() { defer wg.Done() <-ready - val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) + val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") @@ -273,7 +276,7 @@ func TestShutdown(t *testing.T) { go func() { defer wg.Done() - val, err := kw.WatchKey(runnerKey, "something", 10*time.Second) + val, err := kw.WatchKey(ctx, runnerKey, "something", 10*time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change") @@ -295,7 +298,7 @@ func TestShutdown(t *testing.T) { var err error done := make(chan struct{}) go func() { - val, err = kw.WatchKey(runnerKey, "something", 10*time.Second) + val, err = kw.WatchKey(ctx, runnerKey, "something", 10*time.Second) close(done) }() diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go index dd1725a723e..f1b4199b87f 100644 --- a/workhorse/internal/upstream/routes.go +++ b/workhorse/internal/upstream/routes.go @@ -285,6 +285,9 @@ func configureRoutes(u *upstream) { // NuGet Artifact Repository u.route("PUT", apiProjectPattern+`/packages/nuget/`, mimeMultipartUploader), + // NuGet v2 Artifact Repository + u.route("PUT", apiProjectPattern+`/packages/nuget/v2`, mimeMultipartUploader), + // PyPI Artifact Repository u.route("POST", apiProjectPattern+`/packages/pypi`, mimeMultipartUploader), @@ -391,6 +394,7 @@ func configureRoutes(u *upstream) { u.route("GET", geoGitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)), u.route("POST", geoGitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))), u.route("GET", geoGitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})\z`, defaultUpstream), + u.route("POST", geoGitProjectPattern+`info/lfs/objects/batch\z`, defaultUpstream), // Serve health checks from this Geo secondary u.route("", "^/-/(readiness|liveness)$", static.DeployPage(probeUpstream)), diff --git a/workhorse/internal/upstream/routes_test.go b/workhorse/internal/upstream/routes_test.go index 13c000bf791..09551b7f605 100644 --- a/workhorse/internal/upstream/routes_test.go +++ b/workhorse/internal/upstream/routes_test.go @@ -1,6 +1,7 @@ package upstream import ( + "bytes" "testing" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" @@ -86,3 +87,13 @@ func TestAssetsServedLocallyWithGeoProxy(t *testing.T) { runTestCasesWithGeoProxyEnabled(t, testCases) } + +func TestLfsBatchSecondaryGitSSHPullWithGeoProxy(t *testing.T) { + body := bytes.NewBuffer([]byte(`{"operation":"download","objects": [{"oid":"fakeoid", "size":10}], "transfers":["basic", "ssh","lfs-standalone-file"],"ref":{"name":"refs/heads/fakeref"},"hash_algo":"sha256"}`)) + contentType := "application/vnd.git-lfs+json; charset=utf-8" + testCases := []testCasePost{ + {testCase{"GitLab Shell call to /group/project.git/info/lfs/objects/batch", "/group/project.git/info/lfs/objects/batch", "Local Rails server received request to path /group/project.git/info/lfs/objects/batch"}, contentType, body}, + } + + runTestCasesWithGeoProxyEnabledPost(t, testCases) +} diff --git a/workhorse/internal/upstream/upstream_test.go b/workhorse/internal/upstream/upstream_test.go index 705e40c74d5..02da6baa8bf 100644 --- a/workhorse/internal/upstream/upstream_test.go +++ b/workhorse/internal/upstream/upstream_test.go @@ -30,6 +30,12 @@ type testCase struct { expectedResponse string } +type testCasePost struct { + test testCase + contentType string + body io.Reader +} + func TestMain(m *testing.M) { // Secret should be configured before any Geo API poll happens to prevent // race conditions where the first API call happens without a secret path @@ -345,6 +351,24 @@ func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) { } } +func runTestCasesPost(t *testing.T, ws *httptest.Server, testCases []testCasePost) { + t.Helper() + for _, tc := range testCases { + t.Run(tc.test.desc, func(t *testing.T) { + + resp, err := http.Post(ws.URL+tc.test.path, tc.contentType, tc.body) + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + require.Equal(t, 200, resp.StatusCode, "response code") + require.Equal(t, tc.test.expectedResponse, string(body)) + }) + } +} + func runTestCasesWithGeoProxyEnabled(t *testing.T, testCases []testCase) { remoteServer, rsDeferredClose := startRemoteServer("Geo primary") defer rsDeferredClose() @@ -359,6 +383,20 @@ func runTestCasesWithGeoProxyEnabled(t *testing.T, testCases []testCase) { runTestCases(t, ws, testCases) } +func runTestCasesWithGeoProxyEnabledPost(t *testing.T, testCases []testCasePost) { + remoteServer, rsDeferredClose := startRemoteServer("Geo primary") + defer rsDeferredClose() + + geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_enabled":true,"geo_proxy_url":"%v"}`, remoteServer.URL) + railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody) + defer deferredClose() + + ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true) + defer wsDeferredClose() + + runTestCasesPost(t, ws, testCases) +} + func newUpstreamConfig(authBackend string) *config.Config { return &config.Config{ Version: "123", |