Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal')
-rw-r--r--workhorse/internal/builds/register.go9
-rw-r--r--workhorse/internal/builds/register_test.go3
-rw-r--r--workhorse/internal/redis/keywatcher.go3
-rw-r--r--workhorse/internal/redis/keywatcher_test.go13
-rw-r--r--workhorse/internal/upstream/routes.go4
-rw-r--r--workhorse/internal/upstream/routes_test.go11
-rw-r--r--workhorse/internal/upstream/upstream_test.go38
7 files changed, 70 insertions, 11 deletions
diff --git a/workhorse/internal/builds/register.go b/workhorse/internal/builds/register.go
index 0a2fe47ed7e..f45d03ab8ed 100644
--- a/workhorse/internal/builds/register.go
+++ b/workhorse/internal/builds/register.go
@@ -2,6 +2,7 @@ package builds
import (
"bytes"
+ "context"
"encoding/json"
"errors"
"io"
@@ -55,7 +56,7 @@ var (
type largeBodyError struct{ error }
-type WatchKeyHandler func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error)
+type WatchKeyHandler func(ctx context.Context, key, value string, timeout time.Duration) (redis.WatchKeyStatus, error)
type runnerRequest struct {
Token string `json:"token,omitempty"`
@@ -102,11 +103,11 @@ func proxyRegisterRequest(h http.Handler, w http.ResponseWriter, r *http.Request
h.ServeHTTP(w, r)
}
-func watchForRunnerChange(watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) {
+func watchForRunnerChange(ctx context.Context, watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) {
registerHandlerOpenAtWatching.Inc()
defer registerHandlerOpenAtWatching.Dec()
- return watchHandler(runnerBuildQueue+token, lastUpdate, duration)
+ return watchHandler(ctx, runnerBuildQueue+token, lastUpdate, duration)
}
func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDuration time.Duration) http.Handler {
@@ -140,7 +141,7 @@ func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDurati
return
}
- result, err := watchForRunnerChange(watchHandler, runnerRequest.Token,
+ result, err := watchForRunnerChange(r.Context(), watchHandler, runnerRequest.Token,
runnerRequest.LastUpdate, pollingDuration)
if err != nil {
registerHandlerWatchErrors.Inc()
diff --git a/workhorse/internal/builds/register_test.go b/workhorse/internal/builds/register_test.go
index d5cbebd500b..97d66517ac9 100644
--- a/workhorse/internal/builds/register_test.go
+++ b/workhorse/internal/builds/register_test.go
@@ -2,6 +2,7 @@ package builds
import (
"bytes"
+ "context"
"errors"
"io"
"net/http"
@@ -71,7 +72,7 @@ func TestRegisterHandlerMissingData(t *testing.T) {
func expectWatcherToBeExecuted(t *testing.T, watchKeyStatus redis.WatchKeyStatus, watchKeyError error,
httpStatus int, msgAndArgs ...interface{}) {
executed := false
- watchKeyHandler := func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) {
+ watchKeyHandler := func(ctx context.Context, key, value string, timeout time.Duration) (redis.WatchKeyStatus, error) {
executed = true
return watchKeyStatus, watchKeyError
}
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index cdf6ccd7e83..2fd0753c3c9 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -1,6 +1,7 @@
package redis
import (
+ "context"
"errors"
"fmt"
"strings"
@@ -251,7 +252,7 @@ const (
WatchKeyStatusNoChange
)
-func (kw *KeyWatcher) WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) {
+func (kw *KeyWatcher) WatchKey(_ context.Context, key, value string, timeout time.Duration) (WatchKeyStatus, error) {
notify := make(chan string, 1)
if err := kw.addSubscription(key, notify); err != nil {
return WatchKeyStatusNoChange, err
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index bae49d81bb1..3abc1bf1107 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -1,6 +1,7 @@
package redis
import (
+ "context"
"sync"
"testing"
"time"
@@ -10,6 +11,8 @@ import (
"github.com/stretchr/testify/require"
)
+var ctx = context.Background()
+
const (
runnerKey = "runner:build_queue:10"
)
@@ -131,7 +134,7 @@ func TestKeyChangesInstantReturn(t *testing.T) {
defer kw.Shutdown()
kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
- val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout)
+ val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
@@ -187,7 +190,7 @@ func TestKeyChangesWhenWatching(t *testing.T) {
go func() {
defer wg.Done()
<-ready
- val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
+ val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
@@ -245,7 +248,7 @@ func TestKeyChangesParallel(t *testing.T) {
go func() {
defer wg.Done()
<-ready
- val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
+ val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
@@ -273,7 +276,7 @@ func TestShutdown(t *testing.T) {
go func() {
defer wg.Done()
- val, err := kw.WatchKey(runnerKey, "something", 10*time.Second)
+ val, err := kw.WatchKey(ctx, runnerKey, "something", 10*time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
@@ -295,7 +298,7 @@ func TestShutdown(t *testing.T) {
var err error
done := make(chan struct{})
go func() {
- val, err = kw.WatchKey(runnerKey, "something", 10*time.Second)
+ val, err = kw.WatchKey(ctx, runnerKey, "something", 10*time.Second)
close(done)
}()
diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go
index dd1725a723e..f1b4199b87f 100644
--- a/workhorse/internal/upstream/routes.go
+++ b/workhorse/internal/upstream/routes.go
@@ -285,6 +285,9 @@ func configureRoutes(u *upstream) {
// NuGet Artifact Repository
u.route("PUT", apiProjectPattern+`/packages/nuget/`, mimeMultipartUploader),
+ // NuGet v2 Artifact Repository
+ u.route("PUT", apiProjectPattern+`/packages/nuget/v2`, mimeMultipartUploader),
+
// PyPI Artifact Repository
u.route("POST", apiProjectPattern+`/packages/pypi`, mimeMultipartUploader),
@@ -391,6 +394,7 @@ func configureRoutes(u *upstream) {
u.route("GET", geoGitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)),
u.route("POST", geoGitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))),
u.route("GET", geoGitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})\z`, defaultUpstream),
+ u.route("POST", geoGitProjectPattern+`info/lfs/objects/batch\z`, defaultUpstream),
// Serve health checks from this Geo secondary
u.route("", "^/-/(readiness|liveness)$", static.DeployPage(probeUpstream)),
diff --git a/workhorse/internal/upstream/routes_test.go b/workhorse/internal/upstream/routes_test.go
index 13c000bf791..09551b7f605 100644
--- a/workhorse/internal/upstream/routes_test.go
+++ b/workhorse/internal/upstream/routes_test.go
@@ -1,6 +1,7 @@
package upstream
import (
+ "bytes"
"testing"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
@@ -86,3 +87,13 @@ func TestAssetsServedLocallyWithGeoProxy(t *testing.T) {
runTestCasesWithGeoProxyEnabled(t, testCases)
}
+
+func TestLfsBatchSecondaryGitSSHPullWithGeoProxy(t *testing.T) {
+ body := bytes.NewBuffer([]byte(`{"operation":"download","objects": [{"oid":"fakeoid", "size":10}], "transfers":["basic", "ssh","lfs-standalone-file"],"ref":{"name":"refs/heads/fakeref"},"hash_algo":"sha256"}`))
+ contentType := "application/vnd.git-lfs+json; charset=utf-8"
+ testCases := []testCasePost{
+ {testCase{"GitLab Shell call to /group/project.git/info/lfs/objects/batch", "/group/project.git/info/lfs/objects/batch", "Local Rails server received request to path /group/project.git/info/lfs/objects/batch"}, contentType, body},
+ }
+
+ runTestCasesWithGeoProxyEnabledPost(t, testCases)
+}
diff --git a/workhorse/internal/upstream/upstream_test.go b/workhorse/internal/upstream/upstream_test.go
index 705e40c74d5..02da6baa8bf 100644
--- a/workhorse/internal/upstream/upstream_test.go
+++ b/workhorse/internal/upstream/upstream_test.go
@@ -30,6 +30,12 @@ type testCase struct {
expectedResponse string
}
+type testCasePost struct {
+ test testCase
+ contentType string
+ body io.Reader
+}
+
func TestMain(m *testing.M) {
// Secret should be configured before any Geo API poll happens to prevent
// race conditions where the first API call happens without a secret path
@@ -345,6 +351,24 @@ func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) {
}
}
+func runTestCasesPost(t *testing.T, ws *httptest.Server, testCases []testCasePost) {
+ t.Helper()
+ for _, tc := range testCases {
+ t.Run(tc.test.desc, func(t *testing.T) {
+
+ resp, err := http.Post(ws.URL+tc.test.path, tc.contentType, tc.body)
+ require.NoError(t, err)
+ defer resp.Body.Close()
+
+ body, err := io.ReadAll(resp.Body)
+ require.NoError(t, err)
+
+ require.Equal(t, 200, resp.StatusCode, "response code")
+ require.Equal(t, tc.test.expectedResponse, string(body))
+ })
+ }
+}
+
func runTestCasesWithGeoProxyEnabled(t *testing.T, testCases []testCase) {
remoteServer, rsDeferredClose := startRemoteServer("Geo primary")
defer rsDeferredClose()
@@ -359,6 +383,20 @@ func runTestCasesWithGeoProxyEnabled(t *testing.T, testCases []testCase) {
runTestCases(t, ws, testCases)
}
+func runTestCasesWithGeoProxyEnabledPost(t *testing.T, testCases []testCasePost) {
+ remoteServer, rsDeferredClose := startRemoteServer("Geo primary")
+ defer rsDeferredClose()
+
+ geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_enabled":true,"geo_proxy_url":"%v"}`, remoteServer.URL)
+ railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
+ defer deferredClose()
+
+ ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
+ defer wsDeferredClose()
+
+ runTestCasesPost(t, ws, testCases)
+}
+
func newUpstreamConfig(authBackend string) *config.Config {
return &config.Config{
Version: "123",