diff options
Diffstat (limited to 'workhorse/internal/upstream')
-rw-r--r-- | workhorse/internal/upstream/development_test.go | 39 | ||||
-rw-r--r-- | workhorse/internal/upstream/handlers.go | 39 | ||||
-rw-r--r-- | workhorse/internal/upstream/handlers_test.go | 67 | ||||
-rw-r--r-- | workhorse/internal/upstream/metrics.go | 117 | ||||
-rw-r--r-- | workhorse/internal/upstream/notfoundunless.go | 11 | ||||
-rw-r--r-- | workhorse/internal/upstream/roundtripper/roundtripper.go | 61 | ||||
-rw-r--r-- | workhorse/internal/upstream/roundtripper/roundtripper_test.go | 39 | ||||
-rw-r--r-- | workhorse/internal/upstream/roundtripper/transport.go | 27 | ||||
-rw-r--r-- | workhorse/internal/upstream/routes.go | 345 | ||||
-rw-r--r-- | workhorse/internal/upstream/upstream.go | 123 |
10 files changed, 868 insertions, 0 deletions
diff --git a/workhorse/internal/upstream/development_test.go b/workhorse/internal/upstream/development_test.go new file mode 100644 index 00000000000..d2957abb18b --- /dev/null +++ b/workhorse/internal/upstream/development_test.go @@ -0,0 +1,39 @@ +package upstream + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDevelopmentModeEnabled(t *testing.T) { + developmentMode := true + + r, _ := http.NewRequest("GET", "/something", nil) + w := httptest.NewRecorder() + + executed := false + NotFoundUnless(developmentMode, http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + executed = true + })).ServeHTTP(w, r) + + require.True(t, executed, "The handler should get executed") +} + +func TestDevelopmentModeDisabled(t *testing.T) { + developmentMode := false + + r, _ := http.NewRequest("GET", "/something", nil) + w := httptest.NewRecorder() + + executed := false + NotFoundUnless(developmentMode, http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + executed = true + })).ServeHTTP(w, r) + + require.False(t, executed, "The handler should not get executed") + + require.Equal(t, 404, w.Code) +} diff --git a/workhorse/internal/upstream/handlers.go b/workhorse/internal/upstream/handlers.go new file mode 100644 index 00000000000..a6aa148a4ae --- /dev/null +++ b/workhorse/internal/upstream/handlers.go @@ -0,0 +1,39 @@ +package upstream + +import ( + "compress/gzip" + "fmt" + "io" + "net/http" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" +) + +func contentEncodingHandler(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body io.ReadCloser + var err error + + // The client request body may have been gzipped. + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "": + body = r.Body + case "gzip": + body, err = gzip.NewReader(r.Body) + default: + err = fmt.Errorf("unsupported content encoding: %s", contentEncoding) + } + + if err != nil { + helper.Fail500(w, r, fmt.Errorf("contentEncodingHandler: %v", err)) + return + } + defer body.Close() + + r.Body = body + r.Header.Del("Content-Encoding") + + h.ServeHTTP(w, r) + }) +} diff --git a/workhorse/internal/upstream/handlers_test.go b/workhorse/internal/upstream/handlers_test.go new file mode 100644 index 00000000000..10c7479f5c5 --- /dev/null +++ b/workhorse/internal/upstream/handlers_test.go @@ -0,0 +1,67 @@ +package upstream + +import ( + "bytes" + "compress/gzip" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGzipEncoding(t *testing.T) { + resp := httptest.NewRecorder() + + var b bytes.Buffer + w := gzip.NewWriter(&b) + fmt.Fprint(w, "test") + w.Close() + + body := ioutil.NopCloser(&b) + + req, err := http.NewRequest("POST", "http://address/test", body) + require.NoError(t, err) + req.Header.Set("Content-Encoding", "gzip") + + contentEncodingHandler(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { + require.IsType(t, &gzip.Reader{}, r.Body, "body type") + require.Empty(t, r.Header.Get("Content-Encoding"), "Content-Encoding should be deleted") + })).ServeHTTP(resp, req) + + require.Equal(t, 200, resp.Code) +} + +func TestNoEncoding(t *testing.T) { + resp := httptest.NewRecorder() + + var b bytes.Buffer + body := ioutil.NopCloser(&b) + + req, err := http.NewRequest("POST", "http://address/test", body) + require.NoError(t, err) + req.Header.Set("Content-Encoding", "") + + contentEncodingHandler(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { + require.Equal(t, body, r.Body, "Expected the same body") + require.Empty(t, r.Header.Get("Content-Encoding"), "Content-Encoding should be deleted") + })).ServeHTTP(resp, req) + + require.Equal(t, 200, resp.Code) +} + +func TestInvalidEncoding(t *testing.T) { + resp := httptest.NewRecorder() + + req, err := http.NewRequest("POST", "http://address/test", nil) + require.NoError(t, err) + req.Header.Set("Content-Encoding", "application/unknown") + + contentEncodingHandler(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + t.Fatal("it shouldn't be executed") + })).ServeHTTP(resp, req) + + require.Equal(t, 500, resp.Code) +} diff --git a/workhorse/internal/upstream/metrics.go b/workhorse/internal/upstream/metrics.go new file mode 100644 index 00000000000..38528056d43 --- /dev/null +++ b/workhorse/internal/upstream/metrics.go @@ -0,0 +1,117 @@ +package upstream + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + namespace = "gitlab_workhorse" + httpSubsystem = "http" +) + +func secondsDurationBuckets() []float64 { + return []float64{ + 0.005, /* 5ms */ + 0.025, /* 25ms */ + 0.1, /* 100ms */ + 0.5, /* 500ms */ + 1.0, /* 1s */ + 10.0, /* 10s */ + 30.0, /* 30s */ + 60.0, /* 1m */ + 300.0, /* 10m */ + } +} + +func byteSizeBuckets() []float64 { + return []float64{ + 10, + 64, + 256, + 1024, /* 1kB */ + 64 * 1024, /* 64kB */ + 256 * 1024, /* 256kB */ + 1024 * 1024, /* 1mB */ + 64 * 1024 * 1024, /* 64mB */ + } +} + +var ( + httpInFlightRequests = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: httpSubsystem, + Name: "in_flight_requests", + Help: "A gauge of requests currently being served by workhorse.", + }) + + httpRequestsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: httpSubsystem, + Name: "requests_total", + Help: "A counter for requests to workhorse.", + }, + []string{"code", "method", "route"}, + ) + + httpRequestDurationSeconds = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: httpSubsystem, + Name: "request_duration_seconds", + Help: "A histogram of latencies for requests to workhorse.", + Buckets: secondsDurationBuckets(), + }, + []string{"code", "method", "route"}, + ) + + httpRequestSizeBytes = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: httpSubsystem, + Name: "request_size_bytes", + Help: "A histogram of sizes of requests to workhorse.", + Buckets: byteSizeBuckets(), + }, + []string{"code", "method", "route"}, + ) + + httpResponseSizeBytes = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: httpSubsystem, + Name: "response_size_bytes", + Help: "A histogram of response sizes for requests to workhorse.", + Buckets: byteSizeBuckets(), + }, + []string{"code", "method", "route"}, + ) + + httpTimeToWriteHeaderSeconds = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: httpSubsystem, + Name: "time_to_write_header_seconds", + Help: "A histogram of request durations until the response headers are written.", + Buckets: secondsDurationBuckets(), + }, + []string{"code", "method", "route"}, + ) +) + +func instrumentRoute(next http.Handler, method string, regexpStr string) http.Handler { + handler := next + + handler = promhttp.InstrumentHandlerCounter(httpRequestsTotal.MustCurryWith(map[string]string{"route": regexpStr}), handler) + handler = promhttp.InstrumentHandlerDuration(httpRequestDurationSeconds.MustCurryWith(map[string]string{"route": regexpStr}), handler) + handler = promhttp.InstrumentHandlerInFlight(httpInFlightRequests, handler) + handler = promhttp.InstrumentHandlerRequestSize(httpRequestSizeBytes.MustCurryWith(map[string]string{"route": regexpStr}), handler) + handler = promhttp.InstrumentHandlerResponseSize(httpResponseSizeBytes.MustCurryWith(map[string]string{"route": regexpStr}), handler) + handler = promhttp.InstrumentHandlerTimeToWriteHeader(httpTimeToWriteHeaderSeconds.MustCurryWith(map[string]string{"route": regexpStr}), handler) + + return handler +} diff --git a/workhorse/internal/upstream/notfoundunless.go b/workhorse/internal/upstream/notfoundunless.go new file mode 100644 index 00000000000..3bbe3e873a4 --- /dev/null +++ b/workhorse/internal/upstream/notfoundunless.go @@ -0,0 +1,11 @@ +package upstream + +import "net/http" + +func NotFoundUnless(pass bool, handler http.Handler) http.Handler { + if pass { + return handler + } + + return http.HandlerFunc(http.NotFound) +} diff --git a/workhorse/internal/upstream/roundtripper/roundtripper.go b/workhorse/internal/upstream/roundtripper/roundtripper.go new file mode 100644 index 00000000000..84f1983b471 --- /dev/null +++ b/workhorse/internal/upstream/roundtripper/roundtripper.go @@ -0,0 +1,61 @@ +package roundtripper + +import ( + "context" + "fmt" + "net" + "net/http" + "net/url" + "time" + + "gitlab.com/gitlab-org/labkit/correlation" + "gitlab.com/gitlab-org/labkit/tracing" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/badgateway" +) + +func mustParseAddress(address, scheme string) string { + if scheme == "https" { + panic("TLS is not supported for backend connections") + } + + for _, suffix := range []string{"", ":" + scheme} { + address += suffix + if host, port, err := net.SplitHostPort(address); err == nil && host != "" && port != "" { + return host + ":" + port + } + } + + panic(fmt.Errorf("could not parse host:port from address %q and scheme %q", address, scheme)) +} + +// NewBackendRoundTripper returns a new RoundTripper instance using the provided values +func NewBackendRoundTripper(backend *url.URL, socket string, proxyHeadersTimeout time.Duration, developmentMode bool) http.RoundTripper { + // Copied from the definition of http.DefaultTransport. We can't literally copy http.DefaultTransport because of its hidden internal state. + transport, dialer := newBackendTransport() + transport.ResponseHeaderTimeout = proxyHeadersTimeout + + if backend != nil && socket == "" { + address := mustParseAddress(backend.Host, backend.Scheme) + transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, "tcp", address) + } + } else if socket != "" { + transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, "unix", socket) + } + } else { + panic("backend is nil and socket is empty") + } + + return tracing.NewRoundTripper( + correlation.NewInstrumentedRoundTripper( + badgateway.NewRoundTripper(developmentMode, transport), + ), + ) +} + +// NewTestBackendRoundTripper sets up a RoundTripper for testing purposes +func NewTestBackendRoundTripper(backend *url.URL) http.RoundTripper { + return NewBackendRoundTripper(backend, "", 0, true) +} diff --git a/workhorse/internal/upstream/roundtripper/roundtripper_test.go b/workhorse/internal/upstream/roundtripper/roundtripper_test.go new file mode 100644 index 00000000000..79ffa244918 --- /dev/null +++ b/workhorse/internal/upstream/roundtripper/roundtripper_test.go @@ -0,0 +1,39 @@ +package roundtripper + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMustParseAddress(t *testing.T) { + successExamples := []struct{ address, scheme, expected string }{ + {"1.2.3.4:56", "http", "1.2.3.4:56"}, + {"[::1]:23", "http", "::1:23"}, + {"4.5.6.7", "http", "4.5.6.7:http"}, + } + for i, example := range successExamples { + t.Run(strconv.Itoa(i), func(t *testing.T) { + require.Equal(t, example.expected, mustParseAddress(example.address, example.scheme)) + }) + } +} + +func TestMustParseAddressPanic(t *testing.T) { + panicExamples := []struct{ address, scheme string }{ + {"1.2.3.4", ""}, + {"1.2.3.4", "https"}, + } + + for i, panicExample := range panicExamples { + t.Run(strconv.Itoa(i), func(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic") + } + }() + mustParseAddress(panicExample.address, panicExample.scheme) + }) + } +} diff --git a/workhorse/internal/upstream/roundtripper/transport.go b/workhorse/internal/upstream/roundtripper/transport.go new file mode 100644 index 00000000000..84d9623b129 --- /dev/null +++ b/workhorse/internal/upstream/roundtripper/transport.go @@ -0,0 +1,27 @@ +package roundtripper + +import ( + "net" + "net/http" + "time" +) + +// newBackendTransport setups the default HTTP transport which Workhorse uses +// to communicate with the upstream +func newBackendTransport() (*http.Transport, *net.Dialer) { + dialler := &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + } + + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: dialler.DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + + return transport, dialler +} diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go new file mode 100644 index 00000000000..5bbd245719b --- /dev/null +++ b/workhorse/internal/upstream/routes.go @@ -0,0 +1,345 @@ +package upstream + +import ( + "net/http" + "net/url" + "path" + "regexp" + + "github.com/gorilla/websocket" + + "gitlab.com/gitlab-org/labkit/log" + "gitlab.com/gitlab-org/labkit/tracing" + + apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/builds" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/channel" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/git" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/imageresizer" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs" + proxypkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/redis" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/secret" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/sendfile" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/sendurl" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/staticpages" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload" +) + +type matcherFunc func(*http.Request) bool + +type routeEntry struct { + method string + regex *regexp.Regexp + handler http.Handler + matchers []matcherFunc +} + +type routeOptions struct { + tracing bool + matchers []matcherFunc +} + +type uploadPreparers struct { + artifacts upload.Preparer + lfs upload.Preparer + packages upload.Preparer + uploads upload.Preparer +} + +const ( + apiPattern = `^/api/` + ciAPIPattern = `^/ci/api/` + gitProjectPattern = `^/([^/]+/){1,}[^/]+\.git/` + projectPattern = `^/([^/]+/){1,}[^/]+/` + snippetUploadPattern = `^/uploads/personal_snippet` + userUploadPattern = `^/uploads/user` + importPattern = `^/import/` +) + +func compileRegexp(regexpStr string) *regexp.Regexp { + if len(regexpStr) == 0 { + return nil + } + + return regexp.MustCompile(regexpStr) +} + +func withMatcher(f matcherFunc) func(*routeOptions) { + return func(options *routeOptions) { + options.matchers = append(options.matchers, f) + } +} + +func withoutTracing() func(*routeOptions) { + return func(options *routeOptions) { + options.tracing = false + } +} + +func (u *upstream) observabilityMiddlewares(handler http.Handler, method string, regexpStr string) http.Handler { + handler = log.AccessLogger( + handler, + log.WithAccessLogger(u.accessLogger), + log.WithExtraFields(func(r *http.Request) log.Fields { + return log.Fields{ + "route": regexpStr, // This field matches the `route` label in Prometheus metrics + } + }), + ) + + handler = instrumentRoute(handler, method, regexpStr) // Add prometheus metrics + return handler +} + +func (u *upstream) route(method, regexpStr string, handler http.Handler, opts ...func(*routeOptions)) routeEntry { + // Instantiate a route with the defaults + options := routeOptions{ + tracing: true, + } + + for _, f := range opts { + f(&options) + } + + handler = u.observabilityMiddlewares(handler, method, regexpStr) + handler = denyWebsocket(handler) // Disallow websockets + if options.tracing { + // Add distributed tracing + handler = tracing.Handler(handler, tracing.WithRouteIdentifier(regexpStr)) + } + + return routeEntry{ + method: method, + regex: compileRegexp(regexpStr), + handler: handler, + matchers: options.matchers, + } +} + +func (u *upstream) wsRoute(regexpStr string, handler http.Handler, matchers ...matcherFunc) routeEntry { + method := "GET" + handler = u.observabilityMiddlewares(handler, method, regexpStr) + + return routeEntry{ + method: method, + regex: compileRegexp(regexpStr), + handler: handler, + matchers: append(matchers, websocket.IsWebSocketUpgrade), + } +} + +// Creates matcherFuncs for a particular content type. +func isContentType(contentType string) func(*http.Request) bool { + return func(r *http.Request) bool { + return helper.IsContentType(contentType, r.Header.Get("Content-Type")) + } +} + +func (ro *routeEntry) isMatch(cleanedPath string, req *http.Request) bool { + if ro.method != "" && req.Method != ro.method { + return false + } + + if ro.regex != nil && !ro.regex.MatchString(cleanedPath) { + return false + } + + ok := true + for _, matcher := range ro.matchers { + ok = matcher(req) + if !ok { + break + } + } + + return ok +} + +func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config) http.Handler { + proxier := proxypkg.NewProxy(backend, version, rt) + + return senddata.SendData( + sendfile.SendFile(apipkg.Block(proxier)), + git.SendArchive, + git.SendBlob, + git.SendDiff, + git.SendPatch, + git.SendSnapshot, + artifacts.SendEntry, + sendurl.SendURL, + imageresizer.NewResizer(cfg), + ) +} + +// Routing table +// We match against URI not containing the relativeUrlRoot: +// see upstream.ServeHTTP + +func (u *upstream) configureRoutes() { + api := apipkg.NewAPI( + u.Backend, + u.Version, + u.RoundTripper, + ) + + static := &staticpages.Static{DocumentRoot: u.DocumentRoot} + proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config) + cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper) + + assetsNotFoundHandler := NotFoundUnless(u.DevelopmentMode, proxy) + if u.AltDocumentRoot != "" { + altStatic := &staticpages.Static{DocumentRoot: u.AltDocumentRoot} + assetsNotFoundHandler = altStatic.ServeExisting( + u.URLPrefix, + staticpages.CacheExpireMax, + NotFoundUnless(u.DevelopmentMode, proxy), + ) + } + + signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version) + signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config) + + preparers := createUploadPreparers(u.Config) + uploadPath := path.Join(u.DocumentRoot, "uploads/tmp") + uploadAccelerateProxy := upload.Accelerate(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy, preparers.uploads) + ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) + ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration) + + // Serve static files or forward the requests + defaultUpstream := static.ServeExisting( + u.URLPrefix, + staticpages.CacheDisabled, + static.DeployPage(static.ErrorPagesUnless(u.DevelopmentMode, staticpages.ErrorFormatHTML, uploadAccelerateProxy)), + ) + probeUpstream := static.ErrorPagesUnless(u.DevelopmentMode, staticpages.ErrorFormatJSON, proxy) + healthUpstream := static.ErrorPagesUnless(u.DevelopmentMode, staticpages.ErrorFormatText, proxy) + + u.Routes = []routeEntry{ + // Git Clone + u.route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)), + u.route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))), + u.route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))), + u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))), + + // CI Artifacts + u.route("POST", apiPattern+`v4/jobs/[0-9]+/artifacts\z`, contentEncodingHandler(artifacts.UploadArtifacts(api, signingProxy, preparers.artifacts))), + u.route("POST", ciAPIPattern+`v1/builds/[0-9]+/artifacts\z`, contentEncodingHandler(artifacts.UploadArtifacts(api, signingProxy, preparers.artifacts))), + + // ActionCable websocket + u.wsRoute(`^/-/cable\z`, cableProxy), + + // Terminal websocket + u.wsRoute(projectPattern+`-/environments/[0-9]+/terminal.ws\z`, channel.Handler(api)), + u.wsRoute(projectPattern+`-/jobs/[0-9]+/terminal.ws\z`, channel.Handler(api)), + + // Proxy Job Services + u.wsRoute(projectPattern+`-/jobs/[0-9]+/proxy.ws\z`, channel.Handler(api)), + + // Long poll and limit capacity given to jobs/request and builds/register.json + u.route("", apiPattern+`v4/jobs/request\z`, ciAPILongPolling), + u.route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPILongPolling), + + // Maven Artifact Repository + u.route("PUT", apiPattern+`v4/projects/[0-9]+/packages/maven/`, upload.BodyUploader(api, signingProxy, preparers.packages)), + + // Conan Artifact Repository + u.route("PUT", apiPattern+`v4/packages/conan/`, upload.BodyUploader(api, signingProxy, preparers.packages)), + u.route("PUT", apiPattern+`v4/projects/[0-9]+/packages/conan/`, upload.BodyUploader(api, signingProxy, preparers.packages)), + + // Generic Packages Repository + u.route("PUT", apiPattern+`v4/projects/[0-9]+/packages/generic/`, upload.BodyUploader(api, signingProxy, preparers.packages)), + + // NuGet Artifact Repository + u.route("PUT", apiPattern+`v4/projects/[0-9]+/packages/nuget/`, upload.Accelerate(api, signingProxy, preparers.packages)), + + // PyPI Artifact Repository + u.route("POST", apiPattern+`v4/projects/[0-9]+/packages/pypi`, upload.Accelerate(api, signingProxy, preparers.packages)), + + // Debian Artifact Repository + u.route("PUT", apiPattern+`v4/projects/[0-9]+/-/packages/debian/incoming/`, upload.BodyUploader(api, signingProxy, preparers.packages)), + + // We are porting API to disk acceleration + // we need to declare each routes until we have fixed all the routes on the rails codebase. + // Overall status can be seen at https://gitlab.com/groups/gitlab-org/-/epics/1802#current-status + u.route("POST", apiPattern+`v4/projects/[0-9]+/wikis/attachments\z`, uploadAccelerateProxy), + u.route("POST", apiPattern+`graphql\z`, uploadAccelerateProxy), + u.route("POST", apiPattern+`v4/groups/import`, upload.Accelerate(api, signingProxy, preparers.uploads)), + u.route("POST", apiPattern+`v4/projects/import`, upload.Accelerate(api, signingProxy, preparers.uploads)), + + // Project Import via UI upload acceleration + u.route("POST", importPattern+`gitlab_project`, upload.Accelerate(api, signingProxy, preparers.uploads)), + // Group Import via UI upload acceleration + u.route("POST", importPattern+`gitlab_group`, upload.Accelerate(api, signingProxy, preparers.uploads)), + + // Metric image upload + u.route("POST", apiPattern+`v4/projects/[0-9]+/issues/[0-9]+/metric_images\z`, upload.Accelerate(api, signingProxy, preparers.uploads)), + + // Requirements Import via UI upload acceleration + u.route("POST", projectPattern+`requirements_management/requirements/import_csv`, upload.Accelerate(api, signingProxy, preparers.uploads)), + + // Explicitly proxy API requests + u.route("", apiPattern, proxy), + u.route("", ciAPIPattern, proxy), + + // Serve assets + u.route( + "", `^/assets/`, + static.ServeExisting( + u.URLPrefix, + staticpages.CacheExpireMax, + assetsNotFoundHandler, + ), + withoutTracing(), // Tracing on assets is very noisy + ), + + // Uploads + u.route("POST", projectPattern+`uploads\z`, upload.Accelerate(api, signingProxy, preparers.uploads)), + u.route("POST", snippetUploadPattern, upload.Accelerate(api, signingProxy, preparers.uploads)), + u.route("POST", userUploadPattern, upload.Accelerate(api, signingProxy, preparers.uploads)), + + // For legacy reasons, user uploads are stored under the document root. + // To prevent anybody who knows/guesses the URL of a user-uploaded file + // from downloading it we make sure requests to /uploads/ do _not_ pass + // through static.ServeExisting. + u.route("", `^/uploads/`, static.ErrorPagesUnless(u.DevelopmentMode, staticpages.ErrorFormatHTML, proxy)), + + // health checks don't intercept errors and go straight to rails + // TODO: We should probably not return a HTML deploy page? + // https://gitlab.com/gitlab-org/gitlab-workhorse/issues/230 + u.route("", "^/-/(readiness|liveness)$", static.DeployPage(probeUpstream)), + u.route("", "^/-/health$", static.DeployPage(healthUpstream)), + + // This route lets us filter out health checks from our metrics. + u.route("", "^/-/", defaultUpstream), + + u.route("", "", defaultUpstream), + } +} + +func createUploadPreparers(cfg config.Config) uploadPreparers { + defaultPreparer := upload.NewObjectStoragePreparer(cfg) + + return uploadPreparers{ + artifacts: defaultPreparer, + lfs: lfs.NewLfsUploadPreparer(cfg, defaultPreparer), + packages: defaultPreparer, + uploads: defaultPreparer, + } +} + +func denyWebsocket(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if websocket.IsWebSocketUpgrade(r) { + helper.HTTPError(w, r, "websocket upgrade not allowed", http.StatusBadRequest) + return + } + + next.ServeHTTP(w, r) + }) +} diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go new file mode 100644 index 00000000000..fd3f6191a5a --- /dev/null +++ b/workhorse/internal/upstream/upstream.go @@ -0,0 +1,123 @@ +/* +The upstream type implements http.Handler. + +In this file we handle request routing and interaction with the authBackend. +*/ + +package upstream + +import ( + "fmt" + + "net/http" + "strings" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/labkit/correlation" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream/roundtripper" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/urlprefix" +) + +var ( + DefaultBackend = helper.URLMustParse("http://localhost:8080") + requestHeaderBlacklist = []string{ + upload.RewrittenFieldsHeader, + } +) + +type upstream struct { + config.Config + URLPrefix urlprefix.Prefix + Routes []routeEntry + RoundTripper http.RoundTripper + CableRoundTripper http.RoundTripper + accessLogger *logrus.Logger +} + +func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler { + up := upstream{ + Config: cfg, + accessLogger: accessLogger, + } + if up.Backend == nil { + up.Backend = DefaultBackend + } + if up.CableBackend == nil { + up.CableBackend = up.Backend + } + if up.CableSocket == "" { + up.CableSocket = up.Socket + } + up.RoundTripper = roundtripper.NewBackendRoundTripper(up.Backend, up.Socket, up.ProxyHeadersTimeout, cfg.DevelopmentMode) + up.CableRoundTripper = roundtripper.NewBackendRoundTripper(up.CableBackend, up.CableSocket, up.ProxyHeadersTimeout, cfg.DevelopmentMode) + up.configureURLPrefix() + up.configureRoutes() + + var correlationOpts []correlation.InboundHandlerOption + if cfg.PropagateCorrelationID { + correlationOpts = append(correlationOpts, correlation.WithPropagation()) + } + + handler := correlation.InjectCorrelationID(&up, correlationOpts...) + return handler +} + +func (u *upstream) configureURLPrefix() { + relativeURLRoot := u.Backend.Path + if !strings.HasSuffix(relativeURLRoot, "/") { + relativeURLRoot += "/" + } + u.URLPrefix = urlprefix.Prefix(relativeURLRoot) +} + +func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { + helper.FixRemoteAddr(r) + + helper.DisableResponseBuffering(w) + + // Drop RequestURI == "*" (FIXME: why?) + if r.RequestURI == "*" { + helper.HTTPError(w, r, "Connection upgrade not allowed", http.StatusBadRequest) + return + } + + // Disallow connect + if r.Method == "CONNECT" { + helper.HTTPError(w, r, "CONNECT not allowed", http.StatusBadRequest) + return + } + + // Check URL Root + URIPath := urlprefix.CleanURIPath(r.URL.Path) + prefix := u.URLPrefix + if !prefix.Match(URIPath) { + helper.HTTPError(w, r, fmt.Sprintf("Not found %q", URIPath), http.StatusNotFound) + return + } + + // Look for a matching route + var route *routeEntry + for _, ro := range u.Routes { + if ro.isMatch(prefix.Strip(URIPath), r) { + route = &ro + break + } + } + + if route == nil { + // The protocol spec in git/Documentation/technical/http-protocol.txt + // says we must return 403 if no matching service is found. + helper.HTTPError(w, r, "Forbidden", http.StatusForbidden) + return + } + + for _, h := range requestHeaderBlacklist { + r.Header.Del(h) + } + + route.handler.ServeHTTP(w, r) +} |