Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/upstream')
-rw-r--r--workhorse/internal/upstream/development_test.go39
-rw-r--r--workhorse/internal/upstream/handlers.go39
-rw-r--r--workhorse/internal/upstream/handlers_test.go67
-rw-r--r--workhorse/internal/upstream/metrics.go117
-rw-r--r--workhorse/internal/upstream/notfoundunless.go11
-rw-r--r--workhorse/internal/upstream/roundtripper/roundtripper.go61
-rw-r--r--workhorse/internal/upstream/roundtripper/roundtripper_test.go39
-rw-r--r--workhorse/internal/upstream/roundtripper/transport.go27
-rw-r--r--workhorse/internal/upstream/routes.go345
-rw-r--r--workhorse/internal/upstream/upstream.go123
10 files changed, 868 insertions, 0 deletions
diff --git a/workhorse/internal/upstream/development_test.go b/workhorse/internal/upstream/development_test.go
new file mode 100644
index 00000000000..d2957abb18b
--- /dev/null
+++ b/workhorse/internal/upstream/development_test.go
@@ -0,0 +1,39 @@
+package upstream
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestDevelopmentModeEnabled(t *testing.T) {
+ developmentMode := true
+
+ r, _ := http.NewRequest("GET", "/something", nil)
+ w := httptest.NewRecorder()
+
+ executed := false
+ NotFoundUnless(developmentMode, http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
+ executed = true
+ })).ServeHTTP(w, r)
+
+ require.True(t, executed, "The handler should get executed")
+}
+
+func TestDevelopmentModeDisabled(t *testing.T) {
+ developmentMode := false
+
+ r, _ := http.NewRequest("GET", "/something", nil)
+ w := httptest.NewRecorder()
+
+ executed := false
+ NotFoundUnless(developmentMode, http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
+ executed = true
+ })).ServeHTTP(w, r)
+
+ require.False(t, executed, "The handler should not get executed")
+
+ require.Equal(t, 404, w.Code)
+}
diff --git a/workhorse/internal/upstream/handlers.go b/workhorse/internal/upstream/handlers.go
new file mode 100644
index 00000000000..a6aa148a4ae
--- /dev/null
+++ b/workhorse/internal/upstream/handlers.go
@@ -0,0 +1,39 @@
+package upstream
+
+import (
+ "compress/gzip"
+ "fmt"
+ "io"
+ "net/http"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
+)
+
+func contentEncodingHandler(h http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var body io.ReadCloser
+ var err error
+
+ // The client request body may have been gzipped.
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
+ case "":
+ body = r.Body
+ case "gzip":
+ body, err = gzip.NewReader(r.Body)
+ default:
+ err = fmt.Errorf("unsupported content encoding: %s", contentEncoding)
+ }
+
+ if err != nil {
+ helper.Fail500(w, r, fmt.Errorf("contentEncodingHandler: %v", err))
+ return
+ }
+ defer body.Close()
+
+ r.Body = body
+ r.Header.Del("Content-Encoding")
+
+ h.ServeHTTP(w, r)
+ })
+}
diff --git a/workhorse/internal/upstream/handlers_test.go b/workhorse/internal/upstream/handlers_test.go
new file mode 100644
index 00000000000..10c7479f5c5
--- /dev/null
+++ b/workhorse/internal/upstream/handlers_test.go
@@ -0,0 +1,67 @@
+package upstream
+
+import (
+ "bytes"
+ "compress/gzip"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestGzipEncoding(t *testing.T) {
+ resp := httptest.NewRecorder()
+
+ var b bytes.Buffer
+ w := gzip.NewWriter(&b)
+ fmt.Fprint(w, "test")
+ w.Close()
+
+ body := ioutil.NopCloser(&b)
+
+ req, err := http.NewRequest("POST", "http://address/test", body)
+ require.NoError(t, err)
+ req.Header.Set("Content-Encoding", "gzip")
+
+ contentEncodingHandler(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
+ require.IsType(t, &gzip.Reader{}, r.Body, "body type")
+ require.Empty(t, r.Header.Get("Content-Encoding"), "Content-Encoding should be deleted")
+ })).ServeHTTP(resp, req)
+
+ require.Equal(t, 200, resp.Code)
+}
+
+func TestNoEncoding(t *testing.T) {
+ resp := httptest.NewRecorder()
+
+ var b bytes.Buffer
+ body := ioutil.NopCloser(&b)
+
+ req, err := http.NewRequest("POST", "http://address/test", body)
+ require.NoError(t, err)
+ req.Header.Set("Content-Encoding", "")
+
+ contentEncodingHandler(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
+ require.Equal(t, body, r.Body, "Expected the same body")
+ require.Empty(t, r.Header.Get("Content-Encoding"), "Content-Encoding should be deleted")
+ })).ServeHTTP(resp, req)
+
+ require.Equal(t, 200, resp.Code)
+}
+
+func TestInvalidEncoding(t *testing.T) {
+ resp := httptest.NewRecorder()
+
+ req, err := http.NewRequest("POST", "http://address/test", nil)
+ require.NoError(t, err)
+ req.Header.Set("Content-Encoding", "application/unknown")
+
+ contentEncodingHandler(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
+ t.Fatal("it shouldn't be executed")
+ })).ServeHTTP(resp, req)
+
+ require.Equal(t, 500, resp.Code)
+}
diff --git a/workhorse/internal/upstream/metrics.go b/workhorse/internal/upstream/metrics.go
new file mode 100644
index 00000000000..38528056d43
--- /dev/null
+++ b/workhorse/internal/upstream/metrics.go
@@ -0,0 +1,117 @@
+package upstream
+
+import (
+ "net/http"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+const (
+ namespace = "gitlab_workhorse"
+ httpSubsystem = "http"
+)
+
+func secondsDurationBuckets() []float64 {
+ return []float64{
+ 0.005, /* 5ms */
+ 0.025, /* 25ms */
+ 0.1, /* 100ms */
+ 0.5, /* 500ms */
+ 1.0, /* 1s */
+ 10.0, /* 10s */
+ 30.0, /* 30s */
+ 60.0, /* 1m */
+ 300.0, /* 10m */
+ }
+}
+
+func byteSizeBuckets() []float64 {
+ return []float64{
+ 10,
+ 64,
+ 256,
+ 1024, /* 1kB */
+ 64 * 1024, /* 64kB */
+ 256 * 1024, /* 256kB */
+ 1024 * 1024, /* 1mB */
+ 64 * 1024 * 1024, /* 64mB */
+ }
+}
+
+var (
+ httpInFlightRequests = promauto.NewGauge(prometheus.GaugeOpts{
+ Namespace: namespace,
+ Subsystem: httpSubsystem,
+ Name: "in_flight_requests",
+ Help: "A gauge of requests currently being served by workhorse.",
+ })
+
+ httpRequestsTotal = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: httpSubsystem,
+ Name: "requests_total",
+ Help: "A counter for requests to workhorse.",
+ },
+ []string{"code", "method", "route"},
+ )
+
+ httpRequestDurationSeconds = promauto.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: namespace,
+ Subsystem: httpSubsystem,
+ Name: "request_duration_seconds",
+ Help: "A histogram of latencies for requests to workhorse.",
+ Buckets: secondsDurationBuckets(),
+ },
+ []string{"code", "method", "route"},
+ )
+
+ httpRequestSizeBytes = promauto.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: namespace,
+ Subsystem: httpSubsystem,
+ Name: "request_size_bytes",
+ Help: "A histogram of sizes of requests to workhorse.",
+ Buckets: byteSizeBuckets(),
+ },
+ []string{"code", "method", "route"},
+ )
+
+ httpResponseSizeBytes = promauto.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: namespace,
+ Subsystem: httpSubsystem,
+ Name: "response_size_bytes",
+ Help: "A histogram of response sizes for requests to workhorse.",
+ Buckets: byteSizeBuckets(),
+ },
+ []string{"code", "method", "route"},
+ )
+
+ httpTimeToWriteHeaderSeconds = promauto.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: namespace,
+ Subsystem: httpSubsystem,
+ Name: "time_to_write_header_seconds",
+ Help: "A histogram of request durations until the response headers are written.",
+ Buckets: secondsDurationBuckets(),
+ },
+ []string{"code", "method", "route"},
+ )
+)
+
+func instrumentRoute(next http.Handler, method string, regexpStr string) http.Handler {
+ handler := next
+
+ handler = promhttp.InstrumentHandlerCounter(httpRequestsTotal.MustCurryWith(map[string]string{"route": regexpStr}), handler)
+ handler = promhttp.InstrumentHandlerDuration(httpRequestDurationSeconds.MustCurryWith(map[string]string{"route": regexpStr}), handler)
+ handler = promhttp.InstrumentHandlerInFlight(httpInFlightRequests, handler)
+ handler = promhttp.InstrumentHandlerRequestSize(httpRequestSizeBytes.MustCurryWith(map[string]string{"route": regexpStr}), handler)
+ handler = promhttp.InstrumentHandlerResponseSize(httpResponseSizeBytes.MustCurryWith(map[string]string{"route": regexpStr}), handler)
+ handler = promhttp.InstrumentHandlerTimeToWriteHeader(httpTimeToWriteHeaderSeconds.MustCurryWith(map[string]string{"route": regexpStr}), handler)
+
+ return handler
+}
diff --git a/workhorse/internal/upstream/notfoundunless.go b/workhorse/internal/upstream/notfoundunless.go
new file mode 100644
index 00000000000..3bbe3e873a4
--- /dev/null
+++ b/workhorse/internal/upstream/notfoundunless.go
@@ -0,0 +1,11 @@
+package upstream
+
+import "net/http"
+
+func NotFoundUnless(pass bool, handler http.Handler) http.Handler {
+ if pass {
+ return handler
+ }
+
+ return http.HandlerFunc(http.NotFound)
+}
diff --git a/workhorse/internal/upstream/roundtripper/roundtripper.go b/workhorse/internal/upstream/roundtripper/roundtripper.go
new file mode 100644
index 00000000000..84f1983b471
--- /dev/null
+++ b/workhorse/internal/upstream/roundtripper/roundtripper.go
@@ -0,0 +1,61 @@
+package roundtripper
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/http"
+ "net/url"
+ "time"
+
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "gitlab.com/gitlab-org/labkit/tracing"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/badgateway"
+)
+
+func mustParseAddress(address, scheme string) string {
+ if scheme == "https" {
+ panic("TLS is not supported for backend connections")
+ }
+
+ for _, suffix := range []string{"", ":" + scheme} {
+ address += suffix
+ if host, port, err := net.SplitHostPort(address); err == nil && host != "" && port != "" {
+ return host + ":" + port
+ }
+ }
+
+ panic(fmt.Errorf("could not parse host:port from address %q and scheme %q", address, scheme))
+}
+
+// NewBackendRoundTripper returns a new RoundTripper instance using the provided values
+func NewBackendRoundTripper(backend *url.URL, socket string, proxyHeadersTimeout time.Duration, developmentMode bool) http.RoundTripper {
+ // Copied from the definition of http.DefaultTransport. We can't literally copy http.DefaultTransport because of its hidden internal state.
+ transport, dialer := newBackendTransport()
+ transport.ResponseHeaderTimeout = proxyHeadersTimeout
+
+ if backend != nil && socket == "" {
+ address := mustParseAddress(backend.Host, backend.Scheme)
+ transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
+ return dialer.DialContext(ctx, "tcp", address)
+ }
+ } else if socket != "" {
+ transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
+ return dialer.DialContext(ctx, "unix", socket)
+ }
+ } else {
+ panic("backend is nil and socket is empty")
+ }
+
+ return tracing.NewRoundTripper(
+ correlation.NewInstrumentedRoundTripper(
+ badgateway.NewRoundTripper(developmentMode, transport),
+ ),
+ )
+}
+
+// NewTestBackendRoundTripper sets up a RoundTripper for testing purposes
+func NewTestBackendRoundTripper(backend *url.URL) http.RoundTripper {
+ return NewBackendRoundTripper(backend, "", 0, true)
+}
diff --git a/workhorse/internal/upstream/roundtripper/roundtripper_test.go b/workhorse/internal/upstream/roundtripper/roundtripper_test.go
new file mode 100644
index 00000000000..79ffa244918
--- /dev/null
+++ b/workhorse/internal/upstream/roundtripper/roundtripper_test.go
@@ -0,0 +1,39 @@
+package roundtripper
+
+import (
+ "strconv"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestMustParseAddress(t *testing.T) {
+ successExamples := []struct{ address, scheme, expected string }{
+ {"1.2.3.4:56", "http", "1.2.3.4:56"},
+ {"[::1]:23", "http", "::1:23"},
+ {"4.5.6.7", "http", "4.5.6.7:http"},
+ }
+ for i, example := range successExamples {
+ t.Run(strconv.Itoa(i), func(t *testing.T) {
+ require.Equal(t, example.expected, mustParseAddress(example.address, example.scheme))
+ })
+ }
+}
+
+func TestMustParseAddressPanic(t *testing.T) {
+ panicExamples := []struct{ address, scheme string }{
+ {"1.2.3.4", ""},
+ {"1.2.3.4", "https"},
+ }
+
+ for i, panicExample := range panicExamples {
+ t.Run(strconv.Itoa(i), func(t *testing.T) {
+ defer func() {
+ if r := recover(); r == nil {
+ t.Fatal("expected panic")
+ }
+ }()
+ mustParseAddress(panicExample.address, panicExample.scheme)
+ })
+ }
+}
diff --git a/workhorse/internal/upstream/roundtripper/transport.go b/workhorse/internal/upstream/roundtripper/transport.go
new file mode 100644
index 00000000000..84d9623b129
--- /dev/null
+++ b/workhorse/internal/upstream/roundtripper/transport.go
@@ -0,0 +1,27 @@
+package roundtripper
+
+import (
+ "net"
+ "net/http"
+ "time"
+)
+
+// newBackendTransport setups the default HTTP transport which Workhorse uses
+// to communicate with the upstream
+func newBackendTransport() (*http.Transport, *net.Dialer) {
+ dialler := &net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 30 * time.Second,
+ }
+
+ transport := &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ DialContext: dialler.DialContext,
+ MaxIdleConns: 100,
+ IdleConnTimeout: 90 * time.Second,
+ TLSHandshakeTimeout: 10 * time.Second,
+ ExpectContinueTimeout: 1 * time.Second,
+ }
+
+ return transport, dialler
+}
diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go
new file mode 100644
index 00000000000..5bbd245719b
--- /dev/null
+++ b/workhorse/internal/upstream/routes.go
@@ -0,0 +1,345 @@
+package upstream
+
+import (
+ "net/http"
+ "net/url"
+ "path"
+ "regexp"
+
+ "github.com/gorilla/websocket"
+
+ "gitlab.com/gitlab-org/labkit/log"
+ "gitlab.com/gitlab-org/labkit/tracing"
+
+ apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/builds"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/channel"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/imageresizer"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs"
+ proxypkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/secret"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/sendfile"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/sendurl"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/staticpages"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload"
+)
+
+type matcherFunc func(*http.Request) bool
+
+type routeEntry struct {
+ method string
+ regex *regexp.Regexp
+ handler http.Handler
+ matchers []matcherFunc
+}
+
+type routeOptions struct {
+ tracing bool
+ matchers []matcherFunc
+}
+
+type uploadPreparers struct {
+ artifacts upload.Preparer
+ lfs upload.Preparer
+ packages upload.Preparer
+ uploads upload.Preparer
+}
+
+const (
+ apiPattern = `^/api/`
+ ciAPIPattern = `^/ci/api/`
+ gitProjectPattern = `^/([^/]+/){1,}[^/]+\.git/`
+ projectPattern = `^/([^/]+/){1,}[^/]+/`
+ snippetUploadPattern = `^/uploads/personal_snippet`
+ userUploadPattern = `^/uploads/user`
+ importPattern = `^/import/`
+)
+
+func compileRegexp(regexpStr string) *regexp.Regexp {
+ if len(regexpStr) == 0 {
+ return nil
+ }
+
+ return regexp.MustCompile(regexpStr)
+}
+
+func withMatcher(f matcherFunc) func(*routeOptions) {
+ return func(options *routeOptions) {
+ options.matchers = append(options.matchers, f)
+ }
+}
+
+func withoutTracing() func(*routeOptions) {
+ return func(options *routeOptions) {
+ options.tracing = false
+ }
+}
+
+func (u *upstream) observabilityMiddlewares(handler http.Handler, method string, regexpStr string) http.Handler {
+ handler = log.AccessLogger(
+ handler,
+ log.WithAccessLogger(u.accessLogger),
+ log.WithExtraFields(func(r *http.Request) log.Fields {
+ return log.Fields{
+ "route": regexpStr, // This field matches the `route` label in Prometheus metrics
+ }
+ }),
+ )
+
+ handler = instrumentRoute(handler, method, regexpStr) // Add prometheus metrics
+ return handler
+}
+
+func (u *upstream) route(method, regexpStr string, handler http.Handler, opts ...func(*routeOptions)) routeEntry {
+ // Instantiate a route with the defaults
+ options := routeOptions{
+ tracing: true,
+ }
+
+ for _, f := range opts {
+ f(&options)
+ }
+
+ handler = u.observabilityMiddlewares(handler, method, regexpStr)
+ handler = denyWebsocket(handler) // Disallow websockets
+ if options.tracing {
+ // Add distributed tracing
+ handler = tracing.Handler(handler, tracing.WithRouteIdentifier(regexpStr))
+ }
+
+ return routeEntry{
+ method: method,
+ regex: compileRegexp(regexpStr),
+ handler: handler,
+ matchers: options.matchers,
+ }
+}
+
+func (u *upstream) wsRoute(regexpStr string, handler http.Handler, matchers ...matcherFunc) routeEntry {
+ method := "GET"
+ handler = u.observabilityMiddlewares(handler, method, regexpStr)
+
+ return routeEntry{
+ method: method,
+ regex: compileRegexp(regexpStr),
+ handler: handler,
+ matchers: append(matchers, websocket.IsWebSocketUpgrade),
+ }
+}
+
+// Creates matcherFuncs for a particular content type.
+func isContentType(contentType string) func(*http.Request) bool {
+ return func(r *http.Request) bool {
+ return helper.IsContentType(contentType, r.Header.Get("Content-Type"))
+ }
+}
+
+func (ro *routeEntry) isMatch(cleanedPath string, req *http.Request) bool {
+ if ro.method != "" && req.Method != ro.method {
+ return false
+ }
+
+ if ro.regex != nil && !ro.regex.MatchString(cleanedPath) {
+ return false
+ }
+
+ ok := true
+ for _, matcher := range ro.matchers {
+ ok = matcher(req)
+ if !ok {
+ break
+ }
+ }
+
+ return ok
+}
+
+func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config) http.Handler {
+ proxier := proxypkg.NewProxy(backend, version, rt)
+
+ return senddata.SendData(
+ sendfile.SendFile(apipkg.Block(proxier)),
+ git.SendArchive,
+ git.SendBlob,
+ git.SendDiff,
+ git.SendPatch,
+ git.SendSnapshot,
+ artifacts.SendEntry,
+ sendurl.SendURL,
+ imageresizer.NewResizer(cfg),
+ )
+}
+
+// Routing table
+// We match against URI not containing the relativeUrlRoot:
+// see upstream.ServeHTTP
+
+func (u *upstream) configureRoutes() {
+ api := apipkg.NewAPI(
+ u.Backend,
+ u.Version,
+ u.RoundTripper,
+ )
+
+ static := &staticpages.Static{DocumentRoot: u.DocumentRoot}
+ proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config)
+ cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper)
+
+ assetsNotFoundHandler := NotFoundUnless(u.DevelopmentMode, proxy)
+ if u.AltDocumentRoot != "" {
+ altStatic := &staticpages.Static{DocumentRoot: u.AltDocumentRoot}
+ assetsNotFoundHandler = altStatic.ServeExisting(
+ u.URLPrefix,
+ staticpages.CacheExpireMax,
+ NotFoundUnless(u.DevelopmentMode, proxy),
+ )
+ }
+
+ signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version)
+ signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config)
+
+ preparers := createUploadPreparers(u.Config)
+ uploadPath := path.Join(u.DocumentRoot, "uploads/tmp")
+ uploadAccelerateProxy := upload.Accelerate(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy, preparers.uploads)
+ ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
+ ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
+
+ // Serve static files or forward the requests
+ defaultUpstream := static.ServeExisting(
+ u.URLPrefix,
+ staticpages.CacheDisabled,
+ static.DeployPage(static.ErrorPagesUnless(u.DevelopmentMode, staticpages.ErrorFormatHTML, uploadAccelerateProxy)),
+ )
+ probeUpstream := static.ErrorPagesUnless(u.DevelopmentMode, staticpages.ErrorFormatJSON, proxy)
+ healthUpstream := static.ErrorPagesUnless(u.DevelopmentMode, staticpages.ErrorFormatText, proxy)
+
+ u.Routes = []routeEntry{
+ // Git Clone
+ u.route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)),
+ u.route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))),
+ u.route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))),
+ u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))),
+
+ // CI Artifacts
+ u.route("POST", apiPattern+`v4/jobs/[0-9]+/artifacts\z`, contentEncodingHandler(artifacts.UploadArtifacts(api, signingProxy, preparers.artifacts))),
+ u.route("POST", ciAPIPattern+`v1/builds/[0-9]+/artifacts\z`, contentEncodingHandler(artifacts.UploadArtifacts(api, signingProxy, preparers.artifacts))),
+
+ // ActionCable websocket
+ u.wsRoute(`^/-/cable\z`, cableProxy),
+
+ // Terminal websocket
+ u.wsRoute(projectPattern+`-/environments/[0-9]+/terminal.ws\z`, channel.Handler(api)),
+ u.wsRoute(projectPattern+`-/jobs/[0-9]+/terminal.ws\z`, channel.Handler(api)),
+
+ // Proxy Job Services
+ u.wsRoute(projectPattern+`-/jobs/[0-9]+/proxy.ws\z`, channel.Handler(api)),
+
+ // Long poll and limit capacity given to jobs/request and builds/register.json
+ u.route("", apiPattern+`v4/jobs/request\z`, ciAPILongPolling),
+ u.route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPILongPolling),
+
+ // Maven Artifact Repository
+ u.route("PUT", apiPattern+`v4/projects/[0-9]+/packages/maven/`, upload.BodyUploader(api, signingProxy, preparers.packages)),
+
+ // Conan Artifact Repository
+ u.route("PUT", apiPattern+`v4/packages/conan/`, upload.BodyUploader(api, signingProxy, preparers.packages)),
+ u.route("PUT", apiPattern+`v4/projects/[0-9]+/packages/conan/`, upload.BodyUploader(api, signingProxy, preparers.packages)),
+
+ // Generic Packages Repository
+ u.route("PUT", apiPattern+`v4/projects/[0-9]+/packages/generic/`, upload.BodyUploader(api, signingProxy, preparers.packages)),
+
+ // NuGet Artifact Repository
+ u.route("PUT", apiPattern+`v4/projects/[0-9]+/packages/nuget/`, upload.Accelerate(api, signingProxy, preparers.packages)),
+
+ // PyPI Artifact Repository
+ u.route("POST", apiPattern+`v4/projects/[0-9]+/packages/pypi`, upload.Accelerate(api, signingProxy, preparers.packages)),
+
+ // Debian Artifact Repository
+ u.route("PUT", apiPattern+`v4/projects/[0-9]+/-/packages/debian/incoming/`, upload.BodyUploader(api, signingProxy, preparers.packages)),
+
+ // We are porting API to disk acceleration
+ // we need to declare each routes until we have fixed all the routes on the rails codebase.
+ // Overall status can be seen at https://gitlab.com/groups/gitlab-org/-/epics/1802#current-status
+ u.route("POST", apiPattern+`v4/projects/[0-9]+/wikis/attachments\z`, uploadAccelerateProxy),
+ u.route("POST", apiPattern+`graphql\z`, uploadAccelerateProxy),
+ u.route("POST", apiPattern+`v4/groups/import`, upload.Accelerate(api, signingProxy, preparers.uploads)),
+ u.route("POST", apiPattern+`v4/projects/import`, upload.Accelerate(api, signingProxy, preparers.uploads)),
+
+ // Project Import via UI upload acceleration
+ u.route("POST", importPattern+`gitlab_project`, upload.Accelerate(api, signingProxy, preparers.uploads)),
+ // Group Import via UI upload acceleration
+ u.route("POST", importPattern+`gitlab_group`, upload.Accelerate(api, signingProxy, preparers.uploads)),
+
+ // Metric image upload
+ u.route("POST", apiPattern+`v4/projects/[0-9]+/issues/[0-9]+/metric_images\z`, upload.Accelerate(api, signingProxy, preparers.uploads)),
+
+ // Requirements Import via UI upload acceleration
+ u.route("POST", projectPattern+`requirements_management/requirements/import_csv`, upload.Accelerate(api, signingProxy, preparers.uploads)),
+
+ // Explicitly proxy API requests
+ u.route("", apiPattern, proxy),
+ u.route("", ciAPIPattern, proxy),
+
+ // Serve assets
+ u.route(
+ "", `^/assets/`,
+ static.ServeExisting(
+ u.URLPrefix,
+ staticpages.CacheExpireMax,
+ assetsNotFoundHandler,
+ ),
+ withoutTracing(), // Tracing on assets is very noisy
+ ),
+
+ // Uploads
+ u.route("POST", projectPattern+`uploads\z`, upload.Accelerate(api, signingProxy, preparers.uploads)),
+ u.route("POST", snippetUploadPattern, upload.Accelerate(api, signingProxy, preparers.uploads)),
+ u.route("POST", userUploadPattern, upload.Accelerate(api, signingProxy, preparers.uploads)),
+
+ // For legacy reasons, user uploads are stored under the document root.
+ // To prevent anybody who knows/guesses the URL of a user-uploaded file
+ // from downloading it we make sure requests to /uploads/ do _not_ pass
+ // through static.ServeExisting.
+ u.route("", `^/uploads/`, static.ErrorPagesUnless(u.DevelopmentMode, staticpages.ErrorFormatHTML, proxy)),
+
+ // health checks don't intercept errors and go straight to rails
+ // TODO: We should probably not return a HTML deploy page?
+ // https://gitlab.com/gitlab-org/gitlab-workhorse/issues/230
+ u.route("", "^/-/(readiness|liveness)$", static.DeployPage(probeUpstream)),
+ u.route("", "^/-/health$", static.DeployPage(healthUpstream)),
+
+ // This route lets us filter out health checks from our metrics.
+ u.route("", "^/-/", defaultUpstream),
+
+ u.route("", "", defaultUpstream),
+ }
+}
+
+func createUploadPreparers(cfg config.Config) uploadPreparers {
+ defaultPreparer := upload.NewObjectStoragePreparer(cfg)
+
+ return uploadPreparers{
+ artifacts: defaultPreparer,
+ lfs: lfs.NewLfsUploadPreparer(cfg, defaultPreparer),
+ packages: defaultPreparer,
+ uploads: defaultPreparer,
+ }
+}
+
+func denyWebsocket(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if websocket.IsWebSocketUpgrade(r) {
+ helper.HTTPError(w, r, "websocket upgrade not allowed", http.StatusBadRequest)
+ return
+ }
+
+ next.ServeHTTP(w, r)
+ })
+}
diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go
new file mode 100644
index 00000000000..fd3f6191a5a
--- /dev/null
+++ b/workhorse/internal/upstream/upstream.go
@@ -0,0 +1,123 @@
+/*
+The upstream type implements http.Handler.
+
+In this file we handle request routing and interaction with the authBackend.
+*/
+
+package upstream
+
+import (
+ "fmt"
+
+ "net/http"
+ "strings"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/labkit/correlation"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream/roundtripper"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/urlprefix"
+)
+
+var (
+ DefaultBackend = helper.URLMustParse("http://localhost:8080")
+ requestHeaderBlacklist = []string{
+ upload.RewrittenFieldsHeader,
+ }
+)
+
+type upstream struct {
+ config.Config
+ URLPrefix urlprefix.Prefix
+ Routes []routeEntry
+ RoundTripper http.RoundTripper
+ CableRoundTripper http.RoundTripper
+ accessLogger *logrus.Logger
+}
+
+func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler {
+ up := upstream{
+ Config: cfg,
+ accessLogger: accessLogger,
+ }
+ if up.Backend == nil {
+ up.Backend = DefaultBackend
+ }
+ if up.CableBackend == nil {
+ up.CableBackend = up.Backend
+ }
+ if up.CableSocket == "" {
+ up.CableSocket = up.Socket
+ }
+ up.RoundTripper = roundtripper.NewBackendRoundTripper(up.Backend, up.Socket, up.ProxyHeadersTimeout, cfg.DevelopmentMode)
+ up.CableRoundTripper = roundtripper.NewBackendRoundTripper(up.CableBackend, up.CableSocket, up.ProxyHeadersTimeout, cfg.DevelopmentMode)
+ up.configureURLPrefix()
+ up.configureRoutes()
+
+ var correlationOpts []correlation.InboundHandlerOption
+ if cfg.PropagateCorrelationID {
+ correlationOpts = append(correlationOpts, correlation.WithPropagation())
+ }
+
+ handler := correlation.InjectCorrelationID(&up, correlationOpts...)
+ return handler
+}
+
+func (u *upstream) configureURLPrefix() {
+ relativeURLRoot := u.Backend.Path
+ if !strings.HasSuffix(relativeURLRoot, "/") {
+ relativeURLRoot += "/"
+ }
+ u.URLPrefix = urlprefix.Prefix(relativeURLRoot)
+}
+
+func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ helper.FixRemoteAddr(r)
+
+ helper.DisableResponseBuffering(w)
+
+ // Drop RequestURI == "*" (FIXME: why?)
+ if r.RequestURI == "*" {
+ helper.HTTPError(w, r, "Connection upgrade not allowed", http.StatusBadRequest)
+ return
+ }
+
+ // Disallow connect
+ if r.Method == "CONNECT" {
+ helper.HTTPError(w, r, "CONNECT not allowed", http.StatusBadRequest)
+ return
+ }
+
+ // Check URL Root
+ URIPath := urlprefix.CleanURIPath(r.URL.Path)
+ prefix := u.URLPrefix
+ if !prefix.Match(URIPath) {
+ helper.HTTPError(w, r, fmt.Sprintf("Not found %q", URIPath), http.StatusNotFound)
+ return
+ }
+
+ // Look for a matching route
+ var route *routeEntry
+ for _, ro := range u.Routes {
+ if ro.isMatch(prefix.Strip(URIPath), r) {
+ route = &ro
+ break
+ }
+ }
+
+ if route == nil {
+ // The protocol spec in git/Documentation/technical/http-protocol.txt
+ // says we must return 403 if no matching service is found.
+ helper.HTTPError(w, r, "Forbidden", http.StatusForbidden)
+ return
+ }
+
+ for _, h := range requestHeaderBlacklist {
+ r.Header.Del(h)
+ }
+
+ route.handler.ServeHTTP(w, r)
+}