Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/upload')
-rw-r--r--workhorse/internal/upload/artifacts_store_test.go335
-rw-r--r--workhorse/internal/upload/artifacts_upload_test.go331
-rw-r--r--workhorse/internal/upload/artifacts_uploader.go167
-rw-r--r--workhorse/internal/upload/body_uploader.go35
-rw-r--r--workhorse/internal/upload/body_uploader_test.go10
-rw-r--r--workhorse/internal/upload/destination/destination.go236
-rw-r--r--workhorse/internal/upload/destination/destination_test.go504
-rw-r--r--workhorse/internal/upload/destination/filestore/filestore.go21
-rw-r--r--workhorse/internal/upload/destination/filestore/filestore_test.go38
-rw-r--r--workhorse/internal/upload/destination/multi_hash.go48
-rw-r--r--workhorse/internal/upload/destination/objectstore/doc.go3
-rw-r--r--workhorse/internal/upload/destination/objectstore/gocloud_object.go100
-rw-r--r--workhorse/internal/upload/destination/objectstore/gocloud_object_test.go56
-rw-r--r--workhorse/internal/upload/destination/objectstore/multipart.go187
-rw-r--r--workhorse/internal/upload/destination/objectstore/multipart_test.go64
-rw-r--r--workhorse/internal/upload/destination/objectstore/object.go95
-rw-r--r--workhorse/internal/upload/destination/objectstore/object_test.go149
-rw-r--r--workhorse/internal/upload/destination/objectstore/prometheus.go39
-rw-r--r--workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go51
-rw-r--r--workhorse/internal/upload/destination/objectstore/s3_object.go119
-rw-r--r--workhorse/internal/upload/destination/objectstore/s3_object_test.go174
-rw-r--r--workhorse/internal/upload/destination/objectstore/s3_session.go94
-rw-r--r--workhorse/internal/upload/destination/objectstore/s3_session_test.go57
-rw-r--r--workhorse/internal/upload/destination/objectstore/test/consts.go19
-rw-r--r--workhorse/internal/upload/destination/objectstore/test/gocloud_stub.go47
-rw-r--r--workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go278
-rw-r--r--workhorse/internal/upload/destination/objectstore/test/objectstore_stub_test.go167
-rw-r--r--workhorse/internal/upload/destination/objectstore/test/s3_stub.go142
-rw-r--r--workhorse/internal/upload/destination/objectstore/upload_strategy.go46
-rw-r--r--workhorse/internal/upload/destination/objectstore/uploader.go115
-rw-r--r--workhorse/internal/upload/destination/reader.go17
-rw-r--r--workhorse/internal/upload/destination/reader_test.go46
-rw-r--r--workhorse/internal/upload/destination/upload_opts.go171
-rw-r--r--workhorse/internal/upload/destination/upload_opts_test.go342
-rw-r--r--workhorse/internal/upload/lfs_preparer.go47
-rw-r--r--workhorse/internal/upload/lfs_preparer_test.go59
-rw-r--r--workhorse/internal/upload/multipart_uploader.go (renamed from workhorse/internal/upload/accelerate.go)11
-rw-r--r--workhorse/internal/upload/object_storage_preparer.go9
-rw-r--r--workhorse/internal/upload/preparer.go33
-rw-r--r--workhorse/internal/upload/rewrite.go10
-rw-r--r--workhorse/internal/upload/saved_file_tracker.go4
-rw-r--r--workhorse/internal/upload/saved_file_tracker_test.go6
-rw-r--r--workhorse/internal/upload/uploads.go26
-rw-r--r--workhorse/internal/upload/uploads_test.go20
44 files changed, 4450 insertions, 78 deletions
diff --git a/workhorse/internal/upload/artifacts_store_test.go b/workhorse/internal/upload/artifacts_store_test.go
new file mode 100644
index 00000000000..97e66fc37a4
--- /dev/null
+++ b/workhorse/internal/upload/artifacts_store_test.go
@@ -0,0 +1,335 @@
+package upload
+
+import (
+ "archive/zip"
+ "bytes"
+ "crypto/md5"
+ "encoding/hex"
+ "fmt"
+ "io/ioutil"
+ "mime/multipart"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
+)
+
+func createTestZipArchive(t *testing.T) (data []byte, md5Hash string) {
+ var buffer bytes.Buffer
+ archive := zip.NewWriter(&buffer)
+ fileInArchive, err := archive.Create("test.file")
+ require.NoError(t, err)
+ fmt.Fprint(fileInArchive, "test")
+ archive.Close()
+ data = buffer.Bytes()
+
+ hasher := md5.New()
+ hasher.Write(data)
+ hexHash := hasher.Sum(nil)
+ md5Hash = hex.EncodeToString(hexHash)
+
+ return data, md5Hash
+}
+
+func createTestMultipartForm(t *testing.T, data []byte) (bytes.Buffer, string) {
+ var buffer bytes.Buffer
+ writer := multipart.NewWriter(&buffer)
+ file, err := writer.CreateFormFile("file", "my.file")
+ require.NoError(t, err)
+ file.Write(data)
+ writer.Close()
+ return buffer, writer.FormDataContentType()
+}
+
+func testUploadArtifactsFromTestZip(t *testing.T, ts *httptest.Server) *httptest.ResponseRecorder {
+ archiveData, _ := createTestZipArchive(t)
+ contentBuffer, contentType := createTestMultipartForm(t, archiveData)
+
+ return testUploadArtifacts(t, contentType, ts.URL+Path, &contentBuffer)
+}
+
+func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
+ tempPath, err := ioutil.TempDir("", "uploads")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(tempPath)
+
+ archiveData, md5 := createTestZipArchive(t)
+ archiveFile, err := ioutil.TempFile("", "artifact.zip")
+ require.NoError(t, err)
+ defer os.Remove(archiveFile.Name())
+ _, err = archiveFile.Write(archiveData)
+ require.NoError(t, err)
+ archiveFile.Close()
+
+ storeServerCalled := 0
+ storeServerMux := http.NewServeMux()
+ storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
+ require.Equal(t, "PUT", r.Method)
+
+ receivedData, err := ioutil.ReadAll(r.Body)
+ require.NoError(t, err)
+ require.Equal(t, archiveData, receivedData)
+
+ storeServerCalled++
+ w.Header().Set("ETag", md5)
+ w.WriteHeader(200)
+ })
+ storeServerMux.HandleFunc("/store-id", func(w http.ResponseWriter, r *http.Request) {
+ http.ServeFile(w, r, archiveFile.Name())
+ })
+
+ responseProcessorCalled := 0
+ responseProcessor := func(w http.ResponseWriter, r *http.Request) {
+ require.Equal(t, "store-id", r.FormValue("file.remote_id"))
+ require.NotEmpty(t, r.FormValue("file.remote_url"))
+ w.WriteHeader(200)
+ responseProcessorCalled++
+ }
+
+ storeServer := httptest.NewServer(storeServerMux)
+ defer storeServer.Close()
+
+ qs := fmt.Sprintf("?%s=%s", ArtifactFormatKey, ArtifactFormatZip)
+
+ tests := []struct {
+ name string
+ preauth *api.Response
+ }{
+ {
+ name: "ObjectStore Upload",
+ preauth: &api.Response{
+ RemoteObject: api.RemoteObject{
+ StoreURL: storeServer.URL + "/url/put" + qs,
+ ID: "store-id",
+ GetURL: storeServer.URL + "/store-id",
+ },
+ },
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ storeServerCalled = 0
+ responseProcessorCalled = 0
+
+ ts := testArtifactsUploadServer(t, test.preauth, responseProcessor)
+ defer ts.Close()
+
+ contentBuffer, contentType := createTestMultipartForm(t, archiveData)
+ response := testUploadArtifacts(t, contentType, ts.URL+Path+qs, &contentBuffer)
+ require.Equal(t, http.StatusOK, response.Code)
+ testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent)
+ require.Equal(t, 1, storeServerCalled, "store should be called only once")
+ require.Equal(t, 1, responseProcessorCalled, "response processor should be called only once")
+ })
+ }
+}
+
+func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *testing.T) {
+ tempPath, err := ioutil.TempDir("", "uploads")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(tempPath)
+
+ responseProcessor := func(w http.ResponseWriter, r *http.Request) {
+ t.Fatal("it should not be called")
+ }
+
+ authResponse := &api.Response{
+ TempPath: tempPath,
+ RemoteObject: api.RemoteObject{
+ StoreURL: "http://localhost:12323/invalid/url",
+ ID: "store-id",
+ },
+ }
+
+ ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
+ defer ts.Close()
+
+ response := testUploadArtifactsFromTestZip(t, ts)
+ require.Equal(t, http.StatusInternalServerError, response.Code)
+}
+
+func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T) {
+ tempPath, err := ioutil.TempDir("", "uploads")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(tempPath)
+
+ responseProcessor := func(w http.ResponseWriter, r *http.Request) {
+ t.Fatal("it should not be called")
+ }
+
+ authResponse := &api.Response{
+ TempPath: tempPath,
+ RemoteObject: api.RemoteObject{
+ StoreURL: "htt:////invalid-url",
+ ID: "store-id",
+ },
+ }
+
+ ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
+ defer ts.Close()
+
+ response := testUploadArtifactsFromTestZip(t, ts)
+ require.Equal(t, http.StatusInternalServerError, response.Code)
+}
+
+func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T) {
+ putCalledTimes := 0
+
+ storeServerMux := http.NewServeMux()
+ storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
+ putCalledTimes++
+ require.Equal(t, "PUT", r.Method)
+ w.WriteHeader(510)
+ })
+
+ responseProcessor := func(w http.ResponseWriter, r *http.Request) {
+ t.Fatal("it should not be called")
+ }
+
+ storeServer := httptest.NewServer(storeServerMux)
+ defer storeServer.Close()
+
+ authResponse := &api.Response{
+ RemoteObject: api.RemoteObject{
+ StoreURL: storeServer.URL + "/url/put",
+ ID: "store-id",
+ },
+ }
+
+ ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
+ defer ts.Close()
+
+ response := testUploadArtifactsFromTestZip(t, ts)
+ require.Equal(t, http.StatusInternalServerError, response.Code)
+ require.Equal(t, 1, putCalledTimes, "upload should be called only once")
+}
+
+func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testing.T) {
+ putCalledTimes := 0
+
+ storeServerMux := http.NewServeMux()
+ storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
+ putCalledTimes++
+ require.Equal(t, "PUT", r.Method)
+ time.Sleep(10 * time.Second)
+ w.WriteHeader(510)
+ })
+
+ responseProcessor := func(w http.ResponseWriter, r *http.Request) {
+ t.Fatal("it should not be called")
+ }
+
+ storeServer := httptest.NewServer(storeServerMux)
+ defer storeServer.Close()
+
+ authResponse := &api.Response{
+ RemoteObject: api.RemoteObject{
+ StoreURL: storeServer.URL + "/url/put",
+ ID: "store-id",
+ Timeout: 1,
+ },
+ }
+
+ ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
+ defer ts.Close()
+
+ response := testUploadArtifactsFromTestZip(t, ts)
+ require.Equal(t, http.StatusInternalServerError, response.Code)
+ require.Equal(t, 1, putCalledTimes, "upload should be called only once")
+}
+
+func TestUploadHandlerMultipartUploadSizeLimit(t *testing.T) {
+ os, server := test.StartObjectStore()
+ defer server.Close()
+
+ err := os.InitiateMultipartUpload(test.ObjectPath)
+ require.NoError(t, err)
+
+ objectURL := server.URL + test.ObjectPath
+
+ uploadSize := 10
+ preauth := &api.Response{
+ RemoteObject: api.RemoteObject{
+ ID: "store-id",
+ MultipartUpload: &api.MultipartUploadParams{
+ PartSize: 1,
+ PartURLs: []string{objectURL + "?partNumber=1"},
+ AbortURL: objectURL, // DELETE
+ CompleteURL: objectURL, // POST
+ },
+ },
+ }
+
+ responseProcessor := func(w http.ResponseWriter, r *http.Request) {
+ t.Fatal("it should not be called")
+ }
+
+ ts := testArtifactsUploadServer(t, preauth, responseProcessor)
+ defer ts.Close()
+
+ contentBuffer, contentType := createTestMultipartForm(t, make([]byte, uploadSize))
+ response := testUploadArtifacts(t, contentType, ts.URL+Path, &contentBuffer)
+ require.Equal(t, http.StatusRequestEntityTooLarge, response.Code)
+ require.Eventually(t, func() bool {
+ return !os.IsMultipartUpload(test.ObjectPath)
+ }, time.Second, time.Millisecond, "MultipartUpload should not be in progress anymore")
+ require.Empty(t, os.GetObjectMD5(test.ObjectPath), "upload should have failed, so the object should not exists")
+}
+
+func TestUploadHandlerMultipartUploadMaximumSizeFromApi(t *testing.T) {
+ os, server := test.StartObjectStore()
+ defer server.Close()
+
+ err := os.InitiateMultipartUpload(test.ObjectPath)
+ require.NoError(t, err)
+
+ objectURL := server.URL + test.ObjectPath
+
+ uploadSize := int64(10)
+ maxSize := uploadSize - 1
+ preauth := &api.Response{
+ MaximumSize: maxSize,
+ RemoteObject: api.RemoteObject{
+ ID: "store-id",
+ MultipartUpload: &api.MultipartUploadParams{
+ PartSize: uploadSize,
+ PartURLs: []string{objectURL + "?partNumber=1"},
+ AbortURL: objectURL, // DELETE
+ CompleteURL: objectURL, // POST
+ },
+ },
+ }
+
+ responseProcessor := func(w http.ResponseWriter, r *http.Request) {
+ t.Fatal("it should not be called")
+ }
+
+ ts := testArtifactsUploadServer(t, preauth, responseProcessor)
+ defer ts.Close()
+
+ contentBuffer, contentType := createTestMultipartForm(t, make([]byte, uploadSize))
+ response := testUploadArtifacts(t, contentType, ts.URL+Path, &contentBuffer)
+ require.Equal(t, http.StatusRequestEntityTooLarge, response.Code)
+
+ testhelper.Retry(t, 5*time.Second, func() error {
+ if os.GetObjectMD5(test.ObjectPath) == "" {
+ return nil
+ }
+
+ return fmt.Errorf("file is still present")
+ })
+}
diff --git a/workhorse/internal/upload/artifacts_upload_test.go b/workhorse/internal/upload/artifacts_upload_test.go
new file mode 100644
index 00000000000..0a9e4ef3869
--- /dev/null
+++ b/workhorse/internal/upload/artifacts_upload_test.go
@@ -0,0 +1,331 @@
+package upload
+
+import (
+ "archive/zip"
+ "bytes"
+ "compress/gzip"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+
+ "github.com/golang-jwt/jwt/v4"
+
+ "gitlab.com/gitlab-org/labkit/log"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
+
+ "github.com/stretchr/testify/require"
+)
+
+const (
+ MetadataHeaderKey = "Metadata-Status"
+ MetadataHeaderPresent = "present"
+ MetadataHeaderMissing = "missing"
+ Path = "/url/path"
+)
+
+func TestMain(m *testing.M) {
+ if err := testhelper.BuildExecutables(); err != nil {
+ log.WithError(err).Fatal()
+ }
+
+ os.Exit(m.Run())
+}
+
+func testArtifactsUploadServer(t *testing.T, authResponse *api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server {
+ mux := http.NewServeMux()
+ mux.HandleFunc(Path+"/authorize", func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ t.Fatal("Expected POST request")
+ }
+
+ w.Header().Set("Content-Type", api.ResponseContentType)
+
+ data, err := json.Marshal(&authResponse)
+ if err != nil {
+ t.Fatal("Expected to marshal")
+ }
+ w.Write(data)
+ })
+ mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) {
+ opts, err := destination.GetOpts(authResponse)
+ require.NoError(t, err)
+
+ if r.Method != "POST" {
+ t.Fatal("Expected POST request")
+ }
+ if opts.IsLocal() {
+ if r.FormValue("file.path") == "" {
+ t.Fatal("Expected file to be present")
+ return
+ }
+
+ _, err := ioutil.ReadFile(r.FormValue("file.path"))
+ if err != nil {
+ t.Fatal("Expected file to be readable")
+ return
+ }
+ } else {
+ if r.FormValue("file.remote_url") == "" {
+ t.Fatal("Expected file to be remote accessible")
+ return
+ }
+ }
+
+ if r.FormValue("metadata.path") != "" {
+ metadata, err := ioutil.ReadFile(r.FormValue("metadata.path"))
+ if err != nil {
+ t.Fatal("Expected metadata to be readable")
+ return
+ }
+ gz, err := gzip.NewReader(bytes.NewReader(metadata))
+ if err != nil {
+ t.Fatal("Expected metadata to be valid gzip")
+ return
+ }
+ defer gz.Close()
+ metadata, err = ioutil.ReadAll(gz)
+ if err != nil {
+ t.Fatal("Expected metadata to be valid")
+ return
+ }
+ if !bytes.HasPrefix(metadata, []byte(zipartifacts.MetadataHeaderPrefix+zipartifacts.MetadataHeader)) {
+ t.Fatal("Expected metadata to be of valid format")
+ return
+ }
+
+ w.Header().Set(MetadataHeaderKey, MetadataHeaderPresent)
+
+ } else {
+ w.Header().Set(MetadataHeaderKey, MetadataHeaderMissing)
+ }
+
+ w.WriteHeader(http.StatusOK)
+
+ if bodyProcessor != nil {
+ bodyProcessor(w, r)
+ }
+ })
+ return testhelper.TestServerWithHandler(nil, mux.ServeHTTP)
+}
+
+type testServer struct {
+ url string
+ writer *multipart.Writer
+ buffer *bytes.Buffer
+ fileWriter io.Writer
+ cleanup func()
+}
+
+func setupWithTmpPath(t *testing.T, filename string, includeFormat bool, format string, authResponse *api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *testServer {
+ tempPath, err := ioutil.TempDir("", "uploads")
+ require.NoError(t, err)
+
+ if authResponse == nil {
+ authResponse = &api.Response{TempPath: tempPath}
+ }
+
+ ts := testArtifactsUploadServer(t, authResponse, bodyProcessor)
+
+ var buffer bytes.Buffer
+ writer := multipart.NewWriter(&buffer)
+ fileWriter, err := writer.CreateFormFile(filename, "my.file")
+ require.NotNil(t, fileWriter)
+ require.NoError(t, err)
+
+ cleanup := func() {
+ ts.Close()
+ require.NoError(t, os.RemoveAll(tempPath))
+ require.NoError(t, writer.Close())
+ }
+
+ qs := ""
+
+ if includeFormat {
+ qs = fmt.Sprintf("?%s=%s", ArtifactFormatKey, format)
+ }
+
+ return &testServer{url: ts.URL + Path + qs, writer: writer, buffer: &buffer, fileWriter: fileWriter, cleanup: cleanup}
+}
+
+func testUploadArtifacts(t *testing.T, contentType, url string, body io.Reader) *httptest.ResponseRecorder {
+ httpRequest, err := http.NewRequest("POST", url, body)
+ require.NoError(t, err)
+
+ httpRequest.Header.Set("Content-Type", contentType)
+ response := httptest.NewRecorder()
+ parsedURL := helper.URLMustParse(url)
+ roundTripper := roundtripper.NewTestBackendRoundTripper(parsedURL)
+ testhelper.ConfigureSecret()
+ apiClient := api.NewAPI(parsedURL, "123", roundTripper)
+ proxyClient := proxy.NewProxy(parsedURL, "123", roundTripper)
+ Artifacts(apiClient, proxyClient, &DefaultPreparer{}).ServeHTTP(response, httpRequest)
+ return response
+}
+
+func TestUploadHandlerAddingMetadata(t *testing.T) {
+ testCases := []struct {
+ desc string
+ format string
+ includeFormat bool
+ }{
+ {
+ desc: "ZIP format",
+ format: ArtifactFormatZip,
+ includeFormat: true,
+ },
+ {
+ desc: "default format",
+ format: ArtifactFormatDefault,
+ includeFormat: true,
+ },
+ {
+ desc: "default format without artifact_format",
+ format: ArtifactFormatDefault,
+ includeFormat: false,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ s := setupWithTmpPath(t, "file", tc.includeFormat, tc.format, nil,
+ func(w http.ResponseWriter, r *http.Request) {
+ token, err := jwt.ParseWithClaims(r.Header.Get(RewrittenFieldsHeader), &MultipartClaims{}, testhelper.ParseJWT)
+ require.NoError(t, err)
+
+ rewrittenFields := token.Claims.(*MultipartClaims).RewrittenFields
+ require.Equal(t, 2, len(rewrittenFields))
+
+ require.Contains(t, rewrittenFields, "file")
+ require.Contains(t, rewrittenFields, "metadata")
+ require.Contains(t, r.PostForm, "file.gitlab-workhorse-upload")
+ require.Contains(t, r.PostForm, "metadata.gitlab-workhorse-upload")
+ },
+ )
+ defer s.cleanup()
+
+ archive := zip.NewWriter(s.fileWriter)
+ file, err := archive.Create("test.file")
+ require.NotNil(t, file)
+ require.NoError(t, err)
+
+ require.NoError(t, archive.Close())
+ require.NoError(t, s.writer.Close())
+
+ response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer)
+ require.Equal(t, http.StatusOK, response.Code)
+ testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent)
+ })
+ }
+}
+
+func TestUploadHandlerTarArtifact(t *testing.T) {
+ s := setupWithTmpPath(t, "file", true, "tar", nil,
+ func(w http.ResponseWriter, r *http.Request) {
+ token, err := jwt.ParseWithClaims(r.Header.Get(RewrittenFieldsHeader), &MultipartClaims{}, testhelper.ParseJWT)
+ require.NoError(t, err)
+
+ rewrittenFields := token.Claims.(*MultipartClaims).RewrittenFields
+ require.Equal(t, 1, len(rewrittenFields))
+
+ require.Contains(t, rewrittenFields, "file")
+ require.Contains(t, r.PostForm, "file.gitlab-workhorse-upload")
+ },
+ )
+ defer s.cleanup()
+
+ file, err := os.Open("../../testdata/tarfile.tar")
+ require.NoError(t, err)
+
+ _, err = io.Copy(s.fileWriter, file)
+ require.NoError(t, err)
+ require.NoError(t, file.Close())
+ require.NoError(t, s.writer.Close())
+
+ response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer)
+ require.Equal(t, http.StatusOK, response.Code)
+ testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderMissing)
+}
+
+func TestUploadHandlerForUnsupportedArchive(t *testing.T) {
+ s := setupWithTmpPath(t, "file", true, "other", nil, nil)
+ defer s.cleanup()
+ require.NoError(t, s.writer.Close())
+
+ response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer)
+ require.Equal(t, http.StatusOK, response.Code)
+ testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderMissing)
+}
+
+func TestUploadHandlerForMultipleFiles(t *testing.T) {
+ s := setupWithTmpPath(t, "file", true, "", nil, nil)
+ defer s.cleanup()
+
+ file, err := s.writer.CreateFormFile("file", "my.file")
+ require.NotNil(t, file)
+ require.NoError(t, err)
+ require.NoError(t, s.writer.Close())
+
+ response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer)
+ require.Equal(t, http.StatusInternalServerError, response.Code)
+}
+
+func TestUploadFormProcessing(t *testing.T) {
+ s := setupWithTmpPath(t, "metadata", true, "", nil, nil)
+ defer s.cleanup()
+ require.NoError(t, s.writer.Close())
+
+ response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer)
+ require.Equal(t, http.StatusInternalServerError, response.Code)
+}
+
+func TestLsifFileProcessing(t *testing.T) {
+ tempPath, err := ioutil.TempDir("", "uploads")
+ require.NoError(t, err)
+
+ s := setupWithTmpPath(t, "file", true, "zip", &api.Response{TempPath: tempPath, ProcessLsif: true}, nil)
+ defer s.cleanup()
+
+ file, err := os.Open("../../testdata/lsif/valid.lsif.zip")
+ require.NoError(t, err)
+
+ _, err = io.Copy(s.fileWriter, file)
+ require.NoError(t, err)
+ require.NoError(t, file.Close())
+ require.NoError(t, s.writer.Close())
+
+ response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer)
+ require.Equal(t, http.StatusOK, response.Code)
+ testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent)
+}
+
+func TestInvalidLsifFileProcessing(t *testing.T) {
+ tempPath, err := ioutil.TempDir("", "uploads")
+ require.NoError(t, err)
+
+ s := setupWithTmpPath(t, "file", true, "zip", &api.Response{TempPath: tempPath, ProcessLsif: true}, nil)
+ defer s.cleanup()
+
+ file, err := os.Open("../../testdata/lsif/invalid.lsif.zip")
+ require.NoError(t, err)
+
+ _, err = io.Copy(s.fileWriter, file)
+ require.NoError(t, err)
+ require.NoError(t, file.Close())
+ require.NoError(t, s.writer.Close())
+
+ response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer)
+ require.Equal(t, http.StatusInternalServerError, response.Code)
+}
diff --git a/workhorse/internal/upload/artifacts_uploader.go b/workhorse/internal/upload/artifacts_uploader.go
new file mode 100644
index 00000000000..2a91a05fe3d
--- /dev/null
+++ b/workhorse/internal/upload/artifacts_uploader.go
@@ -0,0 +1,167 @@
+package upload
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "mime/multipart"
+ "net/http"
+ "os"
+ "os/exec"
+ "strings"
+ "syscall"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "gitlab.com/gitlab-org/labkit/log"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
+)
+
+// Sent by the runner: https://gitlab.com/gitlab-org/gitlab-runner/blob/c24da19ecce8808d9d2950896f70c94f5ea1cc2e/network/gitlab.go#L580
+const (
+ ArtifactFormatKey = "artifact_format"
+ ArtifactFormatZip = "zip"
+ ArtifactFormatDefault = ""
+)
+
+var zipSubcommandsErrorsCounter = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_zip_subcommand_errors_total",
+ Help: "Errors comming from subcommands used for processing ZIP archives",
+ }, []string{"error"})
+
+type artifactsUploadProcessor struct {
+ opts *destination.UploadOpts
+ format string
+
+ SavedFileTracker
+}
+
+// Artifacts is like a Multipart but specific for artifacts upload.
+func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler {
+ return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
+ opts, _, err := p.Prepare(a)
+ if err != nil {
+ helper.Fail500(w, r, fmt.Errorf("UploadArtifacts: error preparing file storage options"))
+ return
+ }
+
+ format := r.URL.Query().Get(ArtifactFormatKey)
+
+ mg := &artifactsUploadProcessor{opts: opts, format: format, SavedFileTracker: SavedFileTracker{Request: r}}
+ interceptMultipartFiles(w, r, h, a, mg, opts)
+ }, "/authorize")
+}
+
+func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *destination.FileHandler) (*destination.FileHandler, error) {
+ metaReader, metaWriter := io.Pipe()
+ defer metaWriter.Close()
+
+ metaOpts := &destination.UploadOpts{
+ LocalTempPath: a.opts.LocalTempPath,
+ TempFilePrefix: "metadata.gz",
+ }
+ if metaOpts.LocalTempPath == "" {
+ metaOpts.LocalTempPath = os.TempDir()
+ }
+
+ fileName := file.LocalPath
+ if fileName == "" {
+ fileName = file.RemoteURL
+ }
+
+ zipMd := exec.CommandContext(ctx, "gitlab-zip-metadata", fileName)
+ zipMd.Stderr = log.ContextLogger(ctx).Writer()
+ zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ zipMd.Stdout = metaWriter
+
+ if err := zipMd.Start(); err != nil {
+ return nil, err
+ }
+ defer helper.CleanUpProcessGroup(zipMd)
+
+ type saveResult struct {
+ error
+ *destination.FileHandler
+ }
+ done := make(chan saveResult)
+ go func() {
+ var result saveResult
+ result.FileHandler, result.error = destination.Upload(ctx, metaReader, -1, metaOpts)
+
+ done <- result
+ }()
+
+ if err := zipMd.Wait(); err != nil {
+ st, ok := helper.ExitStatus(err)
+
+ if !ok {
+ return nil, err
+ }
+
+ zipSubcommandsErrorsCounter.WithLabelValues(zipartifacts.ErrorLabelByCode(st)).Inc()
+
+ if st == zipartifacts.CodeNotZip {
+ return nil, nil
+ }
+
+ if st == zipartifacts.CodeLimitsReached {
+ return nil, zipartifacts.ErrBadMetadata
+ }
+ }
+
+ metaWriter.Close()
+ result := <-done
+ return result.FileHandler, result.error
+}
+
+func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer) error {
+ // ProcessFile for artifacts requires file form-data field name to eq `file`
+
+ if formName != "file" {
+ return fmt.Errorf("invalid form field: %q", formName)
+ }
+ if a.Count() > 0 {
+ return fmt.Errorf("artifacts request contains more than one file")
+ }
+ a.Track(formName, file.LocalPath)
+
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("ProcessFile: context done")
+ default:
+ }
+
+ if !strings.EqualFold(a.format, ArtifactFormatZip) && a.format != ArtifactFormatDefault {
+ return nil
+ }
+
+ // TODO: can we rely on disk for shipping metadata? Not if we split workhorse and rails in 2 different PODs
+ metadata, err := a.generateMetadataFromZip(ctx, file)
+ if err != nil {
+ return err
+ }
+
+ if metadata != nil {
+ fields, err := metadata.GitLabFinalizeFields("metadata")
+ if err != nil {
+ return fmt.Errorf("finalize metadata field error: %v", err)
+ }
+
+ for k, v := range fields {
+ writer.WriteField(k, v)
+ }
+
+ a.Track("metadata", metadata.LocalPath)
+ }
+
+ return nil
+}
+
+func (a *artifactsUploadProcessor) Name() string {
+ return "artifacts"
+}
diff --git a/workhorse/internal/upload/body_uploader.go b/workhorse/internal/upload/body_uploader.go
index 6c53bd9241b..d831f9f43a1 100644
--- a/workhorse/internal/upload/body_uploader.go
+++ b/workhorse/internal/upload/body_uploader.go
@@ -8,41 +8,10 @@ import (
"strings"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
-type PreAuthorizer interface {
- PreAuthorizeHandler(next api.HandleFunc, suffix string) http.Handler
-}
-
-// Verifier is an optional pluggable behavior for upload paths. If
-// Verify() returns an error, Workhorse will return an error response to
-// the client instead of propagating the request to Rails. The motivating
-// use case is Git LFS, where Workhorse checks the size and SHA256
-// checksum of the uploaded file.
-type Verifier interface {
- // Verify can abort the upload by returning an error
- Verify(handler *filestore.FileHandler) error
-}
-
-// Preparer is a pluggable behavior that interprets a Rails API response
-// and either tells Workhorse how to handle the upload, via the
-// SaveFileOpts and Verifier, or it rejects the request by returning a
-// non-nil error. Its intended use is to make sure the upload gets stored
-// in the right location: either a local directory, or one of several
-// supported object storage backends.
-type Preparer interface {
- Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error)
-}
-
-type DefaultPreparer struct{}
-
-func (s *DefaultPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
- opts, err := filestore.GetOpts(a)
- return opts, nil, err
-}
-
// RequestBody is a request middleware. It will store the request body to
// a location by determined an api.Response value. It then forwards the
// request to gitlab-rails without the original request body.
@@ -54,7 +23,7 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return
}
- fh, err := filestore.SaveFileFromReader(r.Context(), r.Body, r.ContentLength, opts)
+ fh, err := destination.Upload(r.Context(), r.Body, r.ContentLength, opts)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("RequestBody: upload failed: %v", err))
return
diff --git a/workhorse/internal/upload/body_uploader_test.go b/workhorse/internal/upload/body_uploader_test.go
index b3d561ac131..47490db8780 100644
--- a/workhorse/internal/upload/body_uploader_test.go
+++ b/workhorse/internal/upload/body_uploader_test.go
@@ -15,8 +15,8 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
const (
@@ -169,8 +169,8 @@ type alwaysLocalPreparer struct {
prepareError error
}
-func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
- opts, err := filestore.GetOpts(&api.Response{TempPath: os.TempDir()})
+func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*destination.UploadOpts, Verifier, error) {
+ opts, err := destination.GetOpts(&api.Response{TempPath: os.TempDir()})
if err != nil {
return nil, nil, err
}
@@ -180,7 +180,7 @@ func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts,
type alwaysFailsVerifier struct{}
-func (alwaysFailsVerifier) Verify(handler *filestore.FileHandler) error {
+func (alwaysFailsVerifier) Verify(handler *destination.FileHandler) error {
return fmt.Errorf("Verification failed")
}
@@ -188,7 +188,7 @@ type mockVerifier struct {
invoked bool
}
-func (m *mockVerifier) Verify(handler *filestore.FileHandler) error {
+func (m *mockVerifier) Verify(handler *destination.FileHandler) error {
m.invoked = true
return nil
diff --git a/workhorse/internal/upload/destination/destination.go b/workhorse/internal/upload/destination/destination.go
new file mode 100644
index 00000000000..7a030e59a64
--- /dev/null
+++ b/workhorse/internal/upload/destination/destination.go
@@ -0,0 +1,236 @@
+// The destination package handles uploading to a specific destination (delegates
+// to filestore or objectstore packages) based on options from the pre-authorization
+// API and finalizing the upload.
+package destination
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "strconv"
+ "time"
+
+ "github.com/golang-jwt/jwt/v4"
+
+ "gitlab.com/gitlab-org/labkit/log"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/filestore"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
+)
+
+type SizeError error
+
+// ErrEntityTooLarge means that the uploaded content is bigger then maximum allowed size
+var ErrEntityTooLarge = errors.New("entity is too large")
+
+// FileHandler represent a file that has been processed for upload
+// it may be either uploaded to an ObjectStore and/or saved on local path.
+type FileHandler struct {
+ // LocalPath is the path on the disk where file has been stored
+ LocalPath string
+
+ // RemoteID is the objectID provided by GitLab Rails
+ RemoteID string
+ // RemoteURL is ObjectStore URL provided by GitLab Rails
+ RemoteURL string
+
+ // Size is the persisted file size
+ Size int64
+
+ // Name is the resource name to send back to GitLab rails.
+ // It differ from the real file name in order to avoid file collisions
+ Name string
+
+ // a map containing different hashes
+ hashes map[string]string
+
+ // Duration of upload in seconds
+ uploadDuration float64
+}
+
+type uploadClaims struct {
+ Upload map[string]string `json:"upload"`
+ jwt.StandardClaims
+}
+
+// SHA256 hash of the handled file
+func (fh *FileHandler) SHA256() string {
+ return fh.hashes["sha256"]
+}
+
+// MD5 hash of the handled file
+func (fh *FileHandler) MD5() string {
+ return fh.hashes["md5"]
+}
+
+// GitLabFinalizeFields returns a map with all the fields GitLab Rails needs in order to finalize the upload.
+func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, error) {
+ // TODO: remove `data` these once rails fully and exclusively support `signedData` (https://gitlab.com/gitlab-org/gitlab/-/issues/324873)
+ data := make(map[string]string)
+ signedData := make(map[string]string)
+ key := func(field string) string {
+ if prefix == "" {
+ return field
+ }
+
+ return fmt.Sprintf("%s.%s", prefix, field)
+ }
+
+ for k, v := range map[string]string{
+ "name": fh.Name,
+ "path": fh.LocalPath,
+ "remote_url": fh.RemoteURL,
+ "remote_id": fh.RemoteID,
+ "size": strconv.FormatInt(fh.Size, 10),
+ "upload_duration": strconv.FormatFloat(fh.uploadDuration, 'f', -1, 64),
+ } {
+ data[key(k)] = v
+ signedData[k] = v
+ }
+
+ for hashName, hash := range fh.hashes {
+ data[key(hashName)] = hash
+ signedData[hashName] = hash
+ }
+
+ claims := uploadClaims{Upload: signedData, StandardClaims: secret.DefaultClaims}
+ jwtData, err := secret.JWTTokenString(claims)
+ if err != nil {
+ return nil, err
+ }
+ data[key("gitlab-workhorse-upload")] = jwtData
+
+ return data, nil
+}
+
+type consumer interface {
+ Consume(context.Context, io.Reader, time.Time) (int64, error)
+}
+
+// Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
+// Make sure the provided context will not expire before finalizing upload with GitLab Rails.
+func Upload(ctx context.Context, reader io.Reader, size int64, opts *UploadOpts) (*FileHandler, error) {
+ fh := &FileHandler{
+ Name: opts.TempFilePrefix,
+ RemoteID: opts.RemoteID,
+ RemoteURL: opts.RemoteURL,
+ }
+ uploadStartTime := time.Now()
+ defer func() { fh.uploadDuration = time.Since(uploadStartTime).Seconds() }()
+ hashes := newMultiHash()
+ reader = io.TeeReader(reader, hashes.Writer)
+
+ var clientMode string
+ var uploadDestination consumer
+ var err error
+ switch {
+ case opts.IsLocal():
+ clientMode = "local"
+ uploadDestination, err = fh.newLocalFile(ctx, opts)
+ case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud():
+ clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider)
+ p := &objectstore.GoCloudObjectParams{
+ Ctx: ctx,
+ Mux: opts.ObjectStorageConfig.URLMux,
+ BucketURL: opts.ObjectStorageConfig.GoCloudConfig.URL,
+ ObjectName: opts.RemoteTempObjectID,
+ }
+ uploadDestination, err = objectstore.NewGoCloudObject(p)
+ case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid():
+ clientMode = "s3"
+ uploadDestination, err = objectstore.NewS3Object(
+ opts.RemoteTempObjectID,
+ opts.ObjectStorageConfig.S3Credentials,
+ opts.ObjectStorageConfig.S3Config,
+ )
+ case opts.IsMultipart():
+ clientMode = "multipart"
+ uploadDestination, err = objectstore.NewMultipart(
+ opts.PresignedParts,
+ opts.PresignedCompleteMultipart,
+ opts.PresignedAbortMultipart,
+ opts.PresignedDelete,
+ opts.PutHeaders,
+ opts.PartSize,
+ )
+ default:
+ clientMode = "http"
+ uploadDestination, err = objectstore.NewObject(
+ opts.PresignedPut,
+ opts.PresignedDelete,
+ opts.PutHeaders,
+ size,
+ )
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ var hlr *hardLimitReader
+ if opts.MaximumSize > 0 {
+ if size > opts.MaximumSize {
+ return nil, SizeError(fmt.Errorf("the upload size %d is over maximum of %d bytes", size, opts.MaximumSize))
+ }
+
+ hlr = &hardLimitReader{r: reader, n: opts.MaximumSize}
+ reader = hlr
+ }
+
+ fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline)
+ if err != nil {
+ if (err == objectstore.ErrNotEnoughParts) || (hlr != nil && hlr.n < 0) {
+ err = ErrEntityTooLarge
+ }
+ return nil, err
+ }
+
+ if size != -1 && size != fh.Size {
+ return nil, SizeError(fmt.Errorf("expected %d bytes but got only %d", size, fh.Size))
+ }
+
+ logger := log.WithContextFields(ctx, log.Fields{
+ "copied_bytes": fh.Size,
+ "is_local": opts.IsLocal(),
+ "is_multipart": opts.IsMultipart(),
+ "is_remote": !opts.IsLocal(),
+ "remote_id": opts.RemoteID,
+ "temp_file_prefix": opts.TempFilePrefix,
+ "client_mode": clientMode,
+ })
+
+ if opts.IsLocal() {
+ logger = logger.WithField("local_temp_path", opts.LocalTempPath)
+ } else {
+ logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID)
+ }
+
+ logger.Info("saved file")
+ fh.hashes = hashes.finish()
+ return fh, nil
+}
+
+func (fh *FileHandler) newLocalFile(ctx context.Context, opts *UploadOpts) (consumer, error) {
+ // make sure TempFolder exists
+ err := os.MkdirAll(opts.LocalTempPath, 0700)
+ if err != nil {
+ return nil, fmt.Errorf("newLocalFile: mkdir %q: %v", opts.LocalTempPath, err)
+ }
+
+ file, err := ioutil.TempFile(opts.LocalTempPath, opts.TempFilePrefix)
+ if err != nil {
+ return nil, fmt.Errorf("newLocalFile: create file: %v", err)
+ }
+
+ go func() {
+ <-ctx.Done()
+ os.Remove(file.Name())
+ }()
+
+ fh.LocalPath = file.Name()
+ return &filestore.LocalFile{File: file}, nil
+}
diff --git a/workhorse/internal/upload/destination/destination_test.go b/workhorse/internal/upload/destination/destination_test.go
new file mode 100644
index 00000000000..ddf0ea24d60
--- /dev/null
+++ b/workhorse/internal/upload/destination/destination_test.go
@@ -0,0 +1,504 @@
+package destination_test
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path"
+ "strconv"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/golang-jwt/jwt/v4"
+ "github.com/stretchr/testify/require"
+ "gocloud.dev/blob"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
+)
+
+func testDeadline() time.Time {
+ return time.Now().Add(destination.DefaultObjectStoreTimeout)
+}
+
+func requireFileGetsRemovedAsync(t *testing.T, filePath string) {
+ var err error
+ require.Eventually(t, func() bool {
+ _, err = os.Stat(filePath)
+ return err != nil
+ }, 10*time.Second, 10*time.Millisecond)
+ require.True(t, os.IsNotExist(err), "File hasn't been deleted during cleanup")
+}
+
+func requireObjectStoreDeletedAsync(t *testing.T, expectedDeletes int, osStub *test.ObjectstoreStub) {
+ require.Eventually(t, func() bool { return osStub.DeletesCnt() == expectedDeletes }, time.Second, time.Millisecond, "Object not deleted")
+}
+
+func TestUploadWrongSize(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
+ require.NoError(t, err)
+ defer os.RemoveAll(tmpFolder)
+
+ opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"}
+ fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, opts)
+ require.Error(t, err)
+ _, isSizeError := err.(destination.SizeError)
+ require.True(t, isSizeError, "Should fail with SizeError")
+ require.Nil(t, fh)
+}
+
+func TestUploadWithKnownSizeExceedLimit(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
+ require.NoError(t, err)
+ defer os.RemoveAll(tmpFolder)
+
+ opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1}
+ fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
+ require.Error(t, err)
+ _, isSizeError := err.(destination.SizeError)
+ require.True(t, isSizeError, "Should fail with SizeError")
+ require.Nil(t, fh)
+}
+
+func TestUploadWithUnknownSizeExceedLimit(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
+ require.NoError(t, err)
+ defer os.RemoveAll(tmpFolder)
+
+ opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1}
+ fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), -1, opts)
+ require.Equal(t, err, destination.ErrEntityTooLarge)
+ require.Nil(t, fh)
+}
+
+func TestUploadWrongETag(t *testing.T) {
+ tests := []struct {
+ name string
+ multipart bool
+ }{
+ {name: "single part"},
+ {name: "multi part", multipart: true},
+ }
+
+ for _, spec := range tests {
+ t.Run(spec.name, func(t *testing.T) {
+ osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"})
+ defer ts.Close()
+
+ objectURL := ts.URL + test.ObjectPath
+
+ opts := &destination.UploadOpts{
+ RemoteID: "test-file",
+ RemoteURL: objectURL,
+ PresignedPut: objectURL + "?Signature=ASignature",
+ PresignedDelete: objectURL + "?Signature=AnotherSignature",
+ Deadline: testDeadline(),
+ }
+ if spec.multipart {
+ opts.PresignedParts = []string{objectURL + "?partNumber=1"}
+ opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSig"
+ opts.PresignedAbortMultipart = objectURL + "?Signature=AbortSig"
+ opts.PartSize = test.ObjectSize
+
+ osStub.InitiateMultipartUpload(test.ObjectPath)
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
+ require.Nil(t, fh)
+ require.Error(t, err)
+ require.Equal(t, 1, osStub.PutsCnt(), "File not uploaded")
+
+ cancel() // this will trigger an async cleanup
+ requireObjectStoreDeletedAsync(t, 1, osStub)
+ require.False(t, spec.multipart && osStub.IsMultipartUpload(test.ObjectPath), "there must be no multipart upload in progress now")
+ })
+ }
+}
+
+func TestUpload(t *testing.T) {
+ testhelper.ConfigureSecret()
+
+ type remote int
+ const (
+ notRemote remote = iota
+ remoteSingle
+ remoteMultipart
+ )
+
+ tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
+ require.NoError(t, err)
+ defer os.RemoveAll(tmpFolder)
+
+ tests := []struct {
+ name string
+ local bool
+ remote remote
+ }{
+ {name: "Local only", local: true},
+ {name: "Remote Single only", remote: remoteSingle},
+ {name: "Remote Multipart only", remote: remoteMultipart},
+ }
+
+ for _, spec := range tests {
+ t.Run(spec.name, func(t *testing.T) {
+ var opts destination.UploadOpts
+ var expectedDeletes, expectedPuts int
+
+ osStub, ts := test.StartObjectStore()
+ defer ts.Close()
+
+ switch spec.remote {
+ case remoteSingle:
+ objectURL := ts.URL + test.ObjectPath
+
+ opts.RemoteID = "test-file"
+ opts.RemoteURL = objectURL
+ opts.PresignedPut = objectURL + "?Signature=ASignature"
+ opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
+ opts.Deadline = testDeadline()
+
+ expectedDeletes = 1
+ expectedPuts = 1
+ case remoteMultipart:
+ objectURL := ts.URL + test.ObjectPath
+
+ opts.RemoteID = "test-file"
+ opts.RemoteURL = objectURL
+ opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
+ opts.PartSize = int64(len(test.ObjectContent)/2) + 1
+ opts.PresignedParts = []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}
+ opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSignature"
+ opts.Deadline = testDeadline()
+
+ osStub.InitiateMultipartUpload(test.ObjectPath)
+ expectedDeletes = 1
+ expectedPuts = 2
+ }
+
+ if spec.local {
+ opts.LocalTempPath = tmpFolder
+ opts.TempFilePrefix = "test-file"
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
+ require.NoError(t, err)
+ require.NotNil(t, fh)
+
+ require.Equal(t, opts.RemoteID, fh.RemoteID)
+ require.Equal(t, opts.RemoteURL, fh.RemoteURL)
+
+ if spec.local {
+ require.NotEmpty(t, fh.LocalPath, "File not persisted on disk")
+ _, err := os.Stat(fh.LocalPath)
+ require.NoError(t, err)
+
+ dir := path.Dir(fh.LocalPath)
+ require.Equal(t, opts.LocalTempPath, dir)
+ filename := path.Base(fh.LocalPath)
+ beginsWithPrefix := strings.HasPrefix(filename, opts.TempFilePrefix)
+ require.True(t, beginsWithPrefix, fmt.Sprintf("LocalPath filename %q do not begin with TempFilePrefix %q", filename, opts.TempFilePrefix))
+ } else {
+ require.Empty(t, fh.LocalPath, "LocalPath must be empty for non local uploads")
+ }
+
+ require.Equal(t, test.ObjectSize, fh.Size)
+ require.Equal(t, test.ObjectMD5, fh.MD5())
+ require.Equal(t, test.ObjectSHA256, fh.SHA256())
+
+ require.Equal(t, expectedPuts, osStub.PutsCnt(), "ObjectStore PutObject count mismatch")
+ require.Equal(t, 0, osStub.DeletesCnt(), "File deleted too early")
+
+ cancel() // this will trigger an async cleanup
+ requireObjectStoreDeletedAsync(t, expectedDeletes, osStub)
+ requireFileGetsRemovedAsync(t, fh.LocalPath)
+
+ // checking generated fields
+ fields, err := fh.GitLabFinalizeFields("file")
+ require.NoError(t, err)
+
+ checkFileHandlerWithFields(t, fh, fields, "file")
+
+ token, jwtErr := jwt.ParseWithClaims(fields["file.gitlab-workhorse-upload"], &testhelper.UploadClaims{}, testhelper.ParseJWT)
+ require.NoError(t, jwtErr)
+
+ uploadFields := token.Claims.(*testhelper.UploadClaims).Upload
+
+ checkFileHandlerWithFields(t, fh, uploadFields, "")
+ })
+ }
+}
+
+func TestUploadWithS3WorkhorseClient(t *testing.T) {
+ tests := []struct {
+ name string
+ objectSize int64
+ maxSize int64
+ expectedErr error
+ }{
+ {
+ name: "known size with no limit",
+ objectSize: test.ObjectSize,
+ },
+ {
+ name: "unknown size with no limit",
+ objectSize: -1,
+ },
+ {
+ name: "unknown object size with limit",
+ objectSize: -1,
+ maxSize: test.ObjectSize - 1,
+ expectedErr: destination.ErrEntityTooLarge,
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+
+ s3Creds, s3Config, sess, ts := test.SetupS3(t, "")
+ defer ts.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ remoteObject := "tmp/test-file/1"
+ opts := destination.UploadOpts{
+ RemoteID: "test-file",
+ Deadline: testDeadline(),
+ UseWorkhorseClient: true,
+ RemoteTempObjectID: remoteObject,
+ ObjectStorageConfig: destination.ObjectStorageConfig{
+ Provider: "AWS",
+ S3Credentials: s3Creds,
+ S3Config: s3Config,
+ },
+ MaximumSize: tc.maxSize,
+ }
+
+ _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, &opts)
+
+ if tc.expectedErr == nil {
+ require.NoError(t, err)
+ test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent)
+ } else {
+ require.Equal(t, tc.expectedErr, err)
+ test.S3ObjectDoesNotExist(t, sess, s3Config, remoteObject)
+ }
+ })
+ }
+}
+
+func TestUploadWithAzureWorkhorseClient(t *testing.T) {
+ mux, bucketDir, cleanup := test.SetupGoCloudFileBucket(t, "azblob")
+ defer cleanup()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ remoteObject := "tmp/test-file/1"
+ opts := destination.UploadOpts{
+ RemoteID: "test-file",
+ Deadline: testDeadline(),
+ UseWorkhorseClient: true,
+ RemoteTempObjectID: remoteObject,
+ ObjectStorageConfig: destination.ObjectStorageConfig{
+ Provider: "AzureRM",
+ URLMux: mux,
+ GoCloudConfig: config.GoCloudConfig{URL: "azblob://test-container"},
+ },
+ }
+
+ _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
+ require.NoError(t, err)
+
+ test.GoCloudObjectExists(t, bucketDir, remoteObject)
+}
+
+func TestUploadWithUnknownGoCloudScheme(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ mux := new(blob.URLMux)
+
+ remoteObject := "tmp/test-file/1"
+ opts := destination.UploadOpts{
+ RemoteID: "test-file",
+ Deadline: testDeadline(),
+ UseWorkhorseClient: true,
+ RemoteTempObjectID: remoteObject,
+ ObjectStorageConfig: destination.ObjectStorageConfig{
+ Provider: "SomeCloud",
+ URLMux: mux,
+ GoCloudConfig: config.GoCloudConfig{URL: "foo://test-container"},
+ },
+ }
+
+ _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
+ require.Error(t, err)
+}
+
+func TestUploadMultipartInBodyFailure(t *testing.T) {
+ osStub, ts := test.StartObjectStore()
+ defer ts.Close()
+
+ // this is a broken path because it contains bucket name but no key
+ // this is the only way to get an in-body failure from our ObjectStoreStub
+ objectPath := "/bucket-but-no-object-key"
+ objectURL := ts.URL + objectPath
+ opts := destination.UploadOpts{
+ RemoteID: "test-file",
+ RemoteURL: objectURL,
+ PartSize: test.ObjectSize,
+ PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"},
+ PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature",
+ Deadline: testDeadline(),
+ }
+
+ osStub.InitiateMultipartUpload(objectPath)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
+ require.Nil(t, fh)
+ require.Error(t, err)
+ require.EqualError(t, err, test.MultipartUploadInternalError().Error())
+}
+
+func TestUploadRemoteFileWithLimit(t *testing.T) {
+ testhelper.ConfigureSecret()
+
+ type remote int
+ const (
+ notRemote remote = iota
+ remoteSingle
+ remoteMultipart
+ )
+
+ remoteTypes := []remote{remoteSingle, remoteMultipart}
+
+ tests := []struct {
+ name string
+ objectSize int64
+ maxSize int64
+ expectedErr error
+ testData string
+ }{
+ {
+ name: "known size with no limit",
+ testData: test.ObjectContent,
+ objectSize: test.ObjectSize,
+ },
+ {
+ name: "unknown size with no limit",
+ testData: test.ObjectContent,
+ objectSize: -1,
+ },
+ {
+ name: "unknown object size with limit",
+ testData: test.ObjectContent,
+ objectSize: -1,
+ maxSize: test.ObjectSize - 1,
+ expectedErr: destination.ErrEntityTooLarge,
+ },
+ {
+ name: "large object with unknown size with limit",
+ testData: string(make([]byte, 20000)),
+ objectSize: -1,
+ maxSize: 19000,
+ expectedErr: destination.ErrEntityTooLarge,
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ var opts destination.UploadOpts
+
+ for _, remoteType := range remoteTypes {
+ tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
+ require.NoError(t, err)
+ defer os.RemoveAll(tmpFolder)
+
+ osStub, ts := test.StartObjectStore()
+ defer ts.Close()
+
+ switch remoteType {
+ case remoteSingle:
+ objectURL := ts.URL + test.ObjectPath
+
+ opts.RemoteID = "test-file"
+ opts.RemoteURL = objectURL
+ opts.PresignedPut = objectURL + "?Signature=ASignature"
+ opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
+ opts.Deadline = testDeadline()
+ opts.MaximumSize = tc.maxSize
+ case remoteMultipart:
+ objectURL := ts.URL + test.ObjectPath
+
+ opts.RemoteID = "test-file"
+ opts.RemoteURL = objectURL
+ opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
+ opts.PartSize = int64(len(tc.testData)/2) + 1
+ opts.PresignedParts = []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}
+ opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSignature"
+ opts.Deadline = testDeadline()
+ opts.MaximumSize = tc.maxSize
+
+ require.Less(t, int64(len(tc.testData)), int64(len(opts.PresignedParts))*opts.PartSize, "check part size calculation")
+
+ osStub.InitiateMultipartUpload(test.ObjectPath)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fh, err := destination.Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, &opts)
+
+ if tc.expectedErr == nil {
+ require.NoError(t, err)
+ require.NotNil(t, fh)
+ } else {
+ require.True(t, errors.Is(err, tc.expectedErr))
+ require.Nil(t, fh)
+ }
+ }
+ })
+ }
+}
+
+func checkFileHandlerWithFields(t *testing.T, fh *destination.FileHandler, fields map[string]string, prefix string) {
+ key := func(field string) string {
+ if prefix == "" {
+ return field
+ }
+
+ return fmt.Sprintf("%s.%s", prefix, field)
+ }
+
+ require.Equal(t, fh.Name, fields[key("name")])
+ require.Equal(t, fh.LocalPath, fields[key("path")])
+ require.Equal(t, fh.RemoteURL, fields[key("remote_url")])
+ require.Equal(t, fh.RemoteID, fields[key("remote_id")])
+ require.Equal(t, strconv.FormatInt(test.ObjectSize, 10), fields[key("size")])
+ require.Equal(t, test.ObjectMD5, fields[key("md5")])
+ require.Equal(t, test.ObjectSHA1, fields[key("sha1")])
+ require.Equal(t, test.ObjectSHA256, fields[key("sha256")])
+ require.Equal(t, test.ObjectSHA512, fields[key("sha512")])
+ require.NotEmpty(t, fields[key("upload_duration")])
+}
diff --git a/workhorse/internal/upload/destination/filestore/filestore.go b/workhorse/internal/upload/destination/filestore/filestore.go
new file mode 100644
index 00000000000..2d88874bf25
--- /dev/null
+++ b/workhorse/internal/upload/destination/filestore/filestore.go
@@ -0,0 +1,21 @@
+// The filestore package has a consumer specific to uploading to local disk storage.
+package filestore
+
+import (
+ "context"
+ "io"
+ "time"
+)
+
+type LocalFile struct {
+ File io.WriteCloser
+}
+
+func (lf *LocalFile) Consume(_ context.Context, r io.Reader, _ time.Time) (int64, error) {
+ n, err := io.Copy(lf.File, r)
+ errClose := lf.File.Close()
+ if err == nil {
+ err = errClose
+ }
+ return n, err
+}
diff --git a/workhorse/internal/upload/destination/filestore/filestore_test.go b/workhorse/internal/upload/destination/filestore/filestore_test.go
new file mode 100644
index 00000000000..ec67eae96b9
--- /dev/null
+++ b/workhorse/internal/upload/destination/filestore/filestore_test.go
@@ -0,0 +1,38 @@
+package filestore
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestConsume(t *testing.T) {
+ f, err := ioutil.TempFile("", "filestore-local-file")
+ if f != nil {
+ defer os.Remove(f.Name())
+ }
+ require.NoError(t, err)
+ defer f.Close()
+
+ localFile := &LocalFile{File: f}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ content := "file content"
+ reader := strings.NewReader(content)
+ var deadline time.Time
+
+ n, err := localFile.Consume(ctx, reader, deadline)
+ require.NoError(t, err)
+ require.Equal(t, int64(len(content)), n)
+
+ consumedContent, err := ioutil.ReadFile(f.Name())
+ require.NoError(t, err)
+ require.Equal(t, content, string(consumedContent))
+}
diff --git a/workhorse/internal/upload/destination/multi_hash.go b/workhorse/internal/upload/destination/multi_hash.go
new file mode 100644
index 00000000000..7d4884af3dc
--- /dev/null
+++ b/workhorse/internal/upload/destination/multi_hash.go
@@ -0,0 +1,48 @@
+package destination
+
+import (
+ "crypto/md5"
+ "crypto/sha1"
+ "crypto/sha256"
+ "crypto/sha512"
+ "encoding/hex"
+ "hash"
+ "io"
+)
+
+var hashFactories = map[string](func() hash.Hash){
+ "md5": md5.New,
+ "sha1": sha1.New,
+ "sha256": sha256.New,
+ "sha512": sha512.New,
+}
+
+type multiHash struct {
+ io.Writer
+ hashes map[string]hash.Hash
+}
+
+func newMultiHash() (m *multiHash) {
+ m = &multiHash{}
+ m.hashes = make(map[string]hash.Hash)
+
+ var writers []io.Writer
+ for hash, hashFactory := range hashFactories {
+ writer := hashFactory()
+
+ m.hashes[hash] = writer
+ writers = append(writers, writer)
+ }
+
+ m.Writer = io.MultiWriter(writers...)
+ return m
+}
+
+func (m *multiHash) finish() map[string]string {
+ h := make(map[string]string)
+ for hashName, hash := range m.hashes {
+ checksum := hash.Sum(nil)
+ h[hashName] = hex.EncodeToString(checksum)
+ }
+ return h
+}
diff --git a/workhorse/internal/upload/destination/objectstore/doc.go b/workhorse/internal/upload/destination/objectstore/doc.go
new file mode 100644
index 00000000000..00f98f230ec
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/doc.go
@@ -0,0 +1,3 @@
+// The objectstore package consists of a number of consumers specific to uploading
+// to object storage providers that are S3 compatible (e.g. AWS S3, GCP, Azure).
+package objectstore
diff --git a/workhorse/internal/upload/destination/objectstore/gocloud_object.go b/workhorse/internal/upload/destination/objectstore/gocloud_object.go
new file mode 100644
index 00000000000..38545086994
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/gocloud_object.go
@@ -0,0 +1,100 @@
+package objectstore
+
+import (
+ "context"
+ "io"
+ "time"
+
+ "gitlab.com/gitlab-org/labkit/log"
+ "gocloud.dev/blob"
+ "gocloud.dev/gcerrors"
+)
+
+type GoCloudObject struct {
+ bucket *blob.Bucket
+ mux *blob.URLMux
+ bucketURL string
+ objectName string
+ *uploader
+}
+
+type GoCloudObjectParams struct {
+ Ctx context.Context
+ Mux *blob.URLMux
+ BucketURL string
+ ObjectName string
+}
+
+func NewGoCloudObject(p *GoCloudObjectParams) (*GoCloudObject, error) {
+ bucket, err := p.Mux.OpenBucket(p.Ctx, p.BucketURL)
+ if err != nil {
+ return nil, err
+ }
+
+ o := &GoCloudObject{
+ bucket: bucket,
+ mux: p.Mux,
+ bucketURL: p.BucketURL,
+ objectName: p.ObjectName,
+ }
+
+ o.uploader = newUploader(o)
+ return o, nil
+}
+
+func (o *GoCloudObject) Upload(ctx context.Context, r io.Reader) error {
+ defer o.bucket.Close()
+
+ writer, err := o.bucket.NewWriter(ctx, o.objectName, nil)
+ if err != nil {
+ log.ContextLogger(ctx).WithError(err).Error("error creating GoCloud bucket")
+ return err
+ }
+
+ if _, err = io.Copy(writer, r); err != nil {
+ log.ContextLogger(ctx).WithError(err).Error("error writing to GoCloud bucket")
+ writer.Close()
+ return err
+ }
+
+ if err := writer.Close(); err != nil {
+ log.ContextLogger(ctx).WithError(err).Error("error closing GoCloud bucket")
+ return err
+ }
+
+ return nil
+}
+
+func (o *GoCloudObject) ETag() string {
+ return ""
+}
+
+func (o *GoCloudObject) Abort() {
+ o.Delete()
+}
+
+// Delete will always attempt to delete the temporary file.
+// According to https://github.com/google/go-cloud/blob/7818961b5c9a112f7e092d3a2d8479cbca80d187/blob/azureblob/azureblob.go#L881-L883,
+// if the writer is closed before any Write is called, Close will create an empty file.
+func (o *GoCloudObject) Delete() {
+ if o.bucketURL == "" || o.objectName == "" {
+ return
+ }
+
+ // Note we can't use the request context because in a successful
+ // case, the original request has already completed.
+ deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
+ defer cancel()
+
+ bucket, err := o.mux.OpenBucket(deleteCtx, o.bucketURL)
+ if err != nil {
+ log.WithError(err).Error("error opening bucket for delete")
+ return
+ }
+
+ if err := bucket.Delete(deleteCtx, o.objectName); err != nil {
+ if gcerrors.Code(err) != gcerrors.NotFound {
+ log.WithError(err).Error("error deleting object")
+ }
+ }
+}
diff --git a/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go b/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go
new file mode 100644
index 00000000000..57b3a35b41e
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go
@@ -0,0 +1,56 @@
+package objectstore_test
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
+)
+
+func TestGoCloudObjectUpload(t *testing.T) {
+ mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azuretest")
+ defer cleanup()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ deadline := time.Now().Add(testTimeout)
+
+ objectName := "test.png"
+ testURL := "azuretest://azure.example.com/test-container"
+ p := &objectstore.GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName}
+ object, err := objectstore.NewGoCloudObject(p)
+ require.NotNil(t, object)
+ require.NoError(t, err)
+
+ // copy data
+ n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
+
+ bucket, err := mux.OpenBucket(ctx, testURL)
+ require.NoError(t, err)
+
+ // Verify the data was copied correctly.
+ received, err := bucket.ReadAll(ctx, objectName)
+ require.NoError(t, err)
+ require.Equal(t, []byte(test.ObjectContent), received)
+
+ cancel()
+
+ testhelper.Retry(t, 5*time.Second, func() error {
+ exists, err := bucket.Exists(ctx, objectName)
+ require.NoError(t, err)
+
+ if exists {
+ return fmt.Errorf("file %s is still present", objectName)
+ } else {
+ return nil
+ }
+ })
+}
diff --git a/workhorse/internal/upload/destination/objectstore/multipart.go b/workhorse/internal/upload/destination/objectstore/multipart.go
new file mode 100644
index 00000000000..4c5b64b27ee
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/multipart.go
@@ -0,0 +1,187 @@
+package objectstore
+
+import (
+ "bytes"
+ "context"
+ "encoding/xml"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "os"
+
+ "gitlab.com/gitlab-org/labkit/mask"
+)
+
+// ErrNotEnoughParts will be used when writing more than size * len(partURLs)
+var ErrNotEnoughParts = errors.New("not enough Parts")
+
+// Multipart represents a MultipartUpload on a S3 compatible Object Store service.
+// It can be used as io.WriteCloser for uploading an object
+type Multipart struct {
+ PartURLs []string
+ // CompleteURL is a presigned URL for CompleteMultipartUpload
+ CompleteURL string
+ // AbortURL is a presigned URL for AbortMultipartUpload
+ AbortURL string
+ // DeleteURL is a presigned URL for RemoveObject
+ DeleteURL string
+ PutHeaders map[string]string
+ partSize int64
+ etag string
+
+ *uploader
+}
+
+// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes
+// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
+// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
+func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, partSize int64) (*Multipart, error) {
+ m := &Multipart{
+ PartURLs: partURLs,
+ CompleteURL: completeURL,
+ AbortURL: abortURL,
+ DeleteURL: deleteURL,
+ PutHeaders: putHeaders,
+ partSize: partSize,
+ }
+
+ m.uploader = newUploader(m)
+ return m, nil
+}
+
+func (m *Multipart) Upload(ctx context.Context, r io.Reader) error {
+ cmu := &CompleteMultipartUpload{}
+ for i, partURL := range m.PartURLs {
+ src := io.LimitReader(r, m.partSize)
+ part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1)
+ if err != nil {
+ return err
+ }
+ if part == nil {
+ break
+ } else {
+ cmu.Part = append(cmu.Part, part)
+ }
+ }
+
+ n, err := io.Copy(ioutil.Discard, r)
+ if err != nil {
+ return fmt.Errorf("drain pipe: %v", err)
+ }
+ if n > 0 {
+ return ErrNotEnoughParts
+ }
+
+ if err := m.complete(ctx, cmu); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *Multipart) ETag() string {
+ return m.etag
+}
+func (m *Multipart) Abort() {
+ deleteURL(m.AbortURL)
+}
+
+func (m *Multipart) Delete() {
+ deleteURL(m.DeleteURL)
+}
+
+func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
+ file, err := ioutil.TempFile("", "part-buffer")
+ if err != nil {
+ return nil, fmt.Errorf("create temporary buffer file: %v", err)
+ }
+ defer file.Close()
+
+ if err := os.Remove(file.Name()); err != nil {
+ return nil, err
+ }
+
+ n, err := io.Copy(file, src)
+ if err != nil {
+ return nil, err
+ }
+ if n == 0 {
+ return nil, nil
+ }
+
+ if _, err = file.Seek(0, io.SeekStart); err != nil {
+ return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err)
+ }
+
+ etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n)
+ if err != nil {
+ return nil, fmt.Errorf("upload part %d: %v", partNumber, err)
+ }
+ return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
+}
+
+func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) {
+ deadline, ok := ctx.Deadline()
+ if !ok {
+ return "", fmt.Errorf("missing deadline")
+ }
+
+ part, err := newObject(url, "", headers, size, false)
+ if err != nil {
+ return "", err
+ }
+
+ if n, err := part.Consume(ctx, io.LimitReader(body, size), deadline); err != nil || n < size {
+ if err == nil {
+ err = io.ErrUnexpectedEOF
+ }
+ return "", err
+ }
+
+ return part.ETag(), nil
+}
+
+func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error {
+ body, err := xml.Marshal(cmu)
+ if err != nil {
+ return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err)
+ }
+
+ req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body))
+ if err != nil {
+ return fmt.Errorf("create CompleteMultipartUpload request: %v", err)
+ }
+ req.ContentLength = int64(len(body))
+ req.Header.Set("Content-Type", "application/xml")
+ req = req.WithContext(ctx)
+
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status)
+ }
+
+ result := &compoundCompleteMultipartUploadResult{}
+ decoder := xml.NewDecoder(resp.Body)
+ if err := decoder.Decode(&result); err != nil {
+ return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err)
+ }
+
+ if result.isError() {
+ return result
+ }
+
+ if result.CompleteMultipartUploadResult == nil {
+ return fmt.Errorf("empty CompleteMultipartUploadResult")
+ }
+
+ m.etag = extractETag(result.ETag)
+
+ return nil
+}
diff --git a/workhorse/internal/upload/destination/objectstore/multipart_test.go b/workhorse/internal/upload/destination/objectstore/multipart_test.go
new file mode 100644
index 00000000000..4aff3467e30
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/multipart_test.go
@@ -0,0 +1,64 @@
+package objectstore_test
+
+import (
+ "context"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
+)
+
+func TestMultipartUploadWithUpcaseETags(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ var putCnt, postCnt int
+
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ _, err := ioutil.ReadAll(r.Body)
+ require.NoError(t, err)
+ defer r.Body.Close()
+
+ // Part upload request
+ if r.Method == "PUT" {
+ putCnt++
+
+ w.Header().Set("ETag", strings.ToUpper(test.ObjectMD5))
+ }
+
+ // POST with CompleteMultipartUpload request
+ if r.Method == "POST" {
+ completeBody := `<CompleteMultipartUploadResult>
+ <Bucket>test-bucket</Bucket>
+ <ETag>No Longer Checked</ETag>
+ </CompleteMultipartUploadResult>`
+ postCnt++
+
+ w.Write([]byte(completeBody))
+ }
+ }))
+ defer ts.Close()
+
+ deadline := time.Now().Add(testTimeout)
+
+ m, err := objectstore.NewMultipart(
+ []string{ts.URL}, // a single presigned part URL
+ ts.URL, // the complete multipart upload URL
+ "", // no abort
+ "", // no delete
+ map[string]string{}, // no custom headers
+ test.ObjectSize) // parts size equal to the whole content. Only 1 part
+ require.NoError(t, err)
+
+ _, err = m.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, 1, putCnt, "1 part expected")
+ require.Equal(t, 1, postCnt, "1 complete multipart upload expected")
+}
diff --git a/workhorse/internal/upload/destination/objectstore/object.go b/workhorse/internal/upload/destination/objectstore/object.go
new file mode 100644
index 00000000000..b7c4f12f009
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/object.go
@@ -0,0 +1,95 @@
+package objectstore
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+
+ "gitlab.com/gitlab-org/labkit/mask"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper/httptransport"
+)
+
+var httpClient = &http.Client{
+ Transport: httptransport.New(),
+}
+
+// Object represents an object on a S3 compatible Object Store service.
+// It can be used as io.WriteCloser for uploading an object
+type Object struct {
+ // putURL is a presigned URL for PutObject
+ putURL string
+ // deleteURL is a presigned URL for RemoveObject
+ deleteURL string
+ putHeaders map[string]string
+ size int64
+ etag string
+ metrics bool
+
+ *uploader
+}
+
+type StatusCodeError error
+
+// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
+func NewObject(putURL, deleteURL string, putHeaders map[string]string, size int64) (*Object, error) {
+ return newObject(putURL, deleteURL, putHeaders, size, true)
+}
+
+func newObject(putURL, deleteURL string, putHeaders map[string]string, size int64, metrics bool) (*Object, error) {
+ o := &Object{
+ putURL: putURL,
+ deleteURL: deleteURL,
+ putHeaders: putHeaders,
+ size: size,
+ metrics: metrics,
+ }
+
+ o.uploader = newETagCheckUploader(o, metrics)
+ return o, nil
+}
+
+func (o *Object) Upload(ctx context.Context, r io.Reader) error {
+ // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
+ req, err := http.NewRequest(http.MethodPut, o.putURL, ioutil.NopCloser(r))
+
+ if err != nil {
+ return fmt.Errorf("PUT %q: %v", mask.URL(o.putURL), err)
+ }
+ req.ContentLength = o.size
+
+ for k, v := range o.putHeaders {
+ req.Header.Set(k, v)
+ }
+
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("PUT request %q: %v", mask.URL(o.putURL), err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ if o.metrics {
+ objectStorageUploadRequestsInvalidStatus.Inc()
+ }
+ return StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.putURL), resp.Status))
+ }
+
+ o.etag = extractETag(resp.Header.Get("ETag"))
+
+ return nil
+}
+
+func (o *Object) ETag() string {
+ return o.etag
+}
+
+func (o *Object) Abort() {
+ o.Delete()
+}
+
+func (o *Object) Delete() {
+ deleteURL(o.deleteURL)
+}
diff --git a/workhorse/internal/upload/destination/objectstore/object_test.go b/workhorse/internal/upload/destination/objectstore/object_test.go
new file mode 100644
index 00000000000..24117891b6d
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/object_test.go
@@ -0,0 +1,149 @@
+package objectstore_test
+
+import (
+ "context"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
+)
+
+const testTimeout = 10 * time.Second
+
+type osFactory func() (*test.ObjectstoreStub, *httptest.Server)
+
+func testObjectUploadNoErrors(t *testing.T, startObjectStore osFactory, useDeleteURL bool, contentType string) {
+ osStub, ts := startObjectStore()
+ defer ts.Close()
+
+ objectURL := ts.URL + test.ObjectPath
+ var deleteURL string
+ if useDeleteURL {
+ deleteURL = objectURL
+ }
+
+ putHeaders := map[string]string{"Content-Type": contentType}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ deadline := time.Now().Add(testTimeout)
+ object, err := objectstore.NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize)
+ require.NoError(t, err)
+
+ // copy data
+ n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
+
+ require.Equal(t, contentType, osStub.GetHeader(test.ObjectPath, "Content-Type"))
+
+ // Checking MD5 extraction
+ require.Equal(t, osStub.GetObjectMD5(test.ObjectPath), object.ETag())
+
+ // Checking cleanup
+ cancel()
+ require.Equal(t, 1, osStub.PutsCnt(), "Object hasn't been uploaded")
+
+ var expectedDeleteCnt int
+ if useDeleteURL {
+ expectedDeleteCnt = 1
+ }
+ require.Eventually(t, func() bool { return osStub.DeletesCnt() == expectedDeleteCnt }, time.Second, time.Millisecond)
+
+ if useDeleteURL {
+ require.Equal(t, 1, osStub.DeletesCnt(), "Object hasn't been deleted")
+ } else {
+ require.Equal(t, 0, osStub.DeletesCnt(), "Object has been deleted")
+ }
+}
+
+func TestObjectUpload(t *testing.T) {
+ t.Run("with delete URL", func(t *testing.T) {
+ testObjectUploadNoErrors(t, test.StartObjectStore, true, "application/octet-stream")
+ })
+ t.Run("without delete URL", func(t *testing.T) {
+ testObjectUploadNoErrors(t, test.StartObjectStore, false, "application/octet-stream")
+ })
+ t.Run("with custom content type", func(t *testing.T) {
+ testObjectUploadNoErrors(t, test.StartObjectStore, false, "image/jpeg")
+ })
+ t.Run("with upcase ETAG", func(t *testing.T) {
+ factory := func() (*test.ObjectstoreStub, *httptest.Server) {
+ md5s := map[string]string{
+ test.ObjectPath: strings.ToUpper(test.ObjectMD5),
+ }
+
+ return test.StartObjectStoreWithCustomMD5(md5s)
+ }
+
+ testObjectUploadNoErrors(t, factory, false, "application/octet-stream")
+ })
+}
+
+func TestObjectUpload404(t *testing.T) {
+ ts := httptest.NewServer(http.NotFoundHandler())
+ defer ts.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ deadline := time.Now().Add(testTimeout)
+ objectURL := ts.URL + test.ObjectPath
+ object, err := objectstore.NewObject(objectURL, "", map[string]string{}, test.ObjectSize)
+ require.NoError(t, err)
+ _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+
+ require.Error(t, err)
+ _, isStatusCodeError := err.(objectstore.StatusCodeError)
+ require.True(t, isStatusCodeError, "Should fail with StatusCodeError")
+ require.Contains(t, err.Error(), "404")
+}
+
+type endlessReader struct{}
+
+func (e *endlessReader) Read(p []byte) (n int, err error) {
+ for i := 0; i < len(p); i++ {
+ p[i] = '*'
+ }
+
+ return len(p), nil
+}
+
+// TestObjectUploadBrokenConnection purpose is to ensure that errors caused by the upload destination get propagated back correctly.
+// This is important for troubleshooting in production.
+func TestObjectUploadBrokenConnection(t *testing.T) {
+ // This test server closes connection immediately
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ hj, ok := w.(http.Hijacker)
+ if !ok {
+ require.FailNow(t, "webserver doesn't support hijacking")
+ }
+ conn, _, err := hj.Hijack()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ conn.Close()
+ }))
+ defer ts.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ deadline := time.Now().Add(testTimeout)
+ objectURL := ts.URL + test.ObjectPath
+ object, err := objectstore.NewObject(objectURL, "", map[string]string{}, -1)
+ require.NoError(t, err)
+
+ _, copyErr := object.Consume(ctx, &endlessReader{}, deadline)
+ require.Error(t, copyErr)
+ require.NotEqual(t, io.ErrClosedPipe, copyErr, "We are shadowing the real error")
+}
diff --git a/workhorse/internal/upload/destination/objectstore/prometheus.go b/workhorse/internal/upload/destination/objectstore/prometheus.go
new file mode 100644
index 00000000000..20762fb52bc
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/prometheus.go
@@ -0,0 +1,39 @@
+package objectstore
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+var (
+ objectStorageUploadRequests = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_object_storage_upload_requests",
+ Help: "How many object storage requests have been processed",
+ },
+ []string{"status"},
+ )
+ objectStorageUploadsOpen = promauto.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_object_storage_upload_open",
+ Help: "Describes many object storage requests are open now",
+ },
+ )
+ objectStorageUploadBytes = promauto.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_object_storage_upload_bytes",
+ Help: "How many bytes were sent to object storage",
+ },
+ )
+ objectStorageUploadTime = promauto.NewHistogram(
+ prometheus.HistogramOpts{
+ Name: "gitlab_workhorse_object_storage_upload_time",
+ Help: "How long it took to upload objects",
+ Buckets: objectStorageUploadTimeBuckets,
+ })
+
+ objectStorageUploadRequestsRequestFailed = objectStorageUploadRequests.WithLabelValues("request-failed")
+ objectStorageUploadRequestsInvalidStatus = objectStorageUploadRequests.WithLabelValues("invalid-status")
+
+ objectStorageUploadTimeBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}
+)
diff --git a/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go b/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go
new file mode 100644
index 00000000000..b84f5757f49
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go
@@ -0,0 +1,51 @@
+package objectstore
+
+import (
+ "encoding/xml"
+ "fmt"
+)
+
+// CompleteMultipartUpload is the S3 CompleteMultipartUpload body
+type CompleteMultipartUpload struct {
+ Part []*completeMultipartUploadPart
+}
+
+type completeMultipartUploadPart struct {
+ PartNumber int
+ ETag string
+}
+
+// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request
+type CompleteMultipartUploadResult struct {
+ Location string
+ Bucket string
+ Key string
+ ETag string
+}
+
+// CompleteMultipartUploadError is the in-body error structure
+// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples
+// the answer contains other fields we are not using
+type CompleteMultipartUploadError struct {
+ XMLName xml.Name `xml:"Error"`
+ Code string
+ Message string
+}
+
+func (c *CompleteMultipartUploadError) Error() string {
+ return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message)
+}
+
+// compoundCompleteMultipartUploadResult holds both CompleteMultipartUploadResult and CompleteMultipartUploadError
+// this allow us to deserialize the response body where the root element can either be Error orCompleteMultipartUploadResult
+type compoundCompleteMultipartUploadResult struct {
+ *CompleteMultipartUploadResult
+ *CompleteMultipartUploadError
+
+ // XMLName this overrides CompleteMultipartUploadError.XMLName tags
+ XMLName xml.Name
+}
+
+func (c *compoundCompleteMultipartUploadResult) isError() bool {
+ return c.CompleteMultipartUploadError != nil
+}
diff --git a/workhorse/internal/upload/destination/objectstore/s3_object.go b/workhorse/internal/upload/destination/objectstore/s3_object.go
new file mode 100644
index 00000000000..ce0cccc7eb1
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/s3_object.go
@@ -0,0 +1,119 @@
+package objectstore
+
+import (
+ "context"
+ "io"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "gitlab.com/gitlab-org/labkit/log"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+)
+
+type S3Object struct {
+ credentials config.S3Credentials
+ config config.S3Config
+ objectName string
+ uploaded bool
+
+ *uploader
+}
+
+func NewS3Object(objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config) (*S3Object, error) {
+ o := &S3Object{
+ credentials: s3Credentials,
+ config: s3Config,
+ objectName: objectName,
+ }
+
+ o.uploader = newUploader(o)
+ return o, nil
+}
+
+func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) {
+ if s3Config.ServerSideEncryption != "" {
+ input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption)
+
+ if s3Config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms && s3Config.SSEKMSKeyID != "" {
+ input.SSEKMSKeyId = aws.String(s3Config.SSEKMSKeyID)
+ }
+ }
+}
+
+func (s *S3Object) Upload(ctx context.Context, r io.Reader) error {
+ sess, err := setupS3Session(s.credentials, s.config)
+ if err != nil {
+ log.WithError(err).Error("error creating S3 session")
+ return err
+ }
+
+ uploader := s3manager.NewUploader(sess)
+
+ input := &s3manager.UploadInput{
+ Bucket: aws.String(s.config.Bucket),
+ Key: aws.String(s.objectName),
+ Body: r,
+ }
+
+ setEncryptionOptions(input, s.config)
+
+ _, err = uploader.UploadWithContext(ctx, input)
+ if err != nil {
+ log.WithError(err).Error("error uploading S3 session")
+ // Get the root cause, such as ErrEntityTooLarge, so we can return the proper HTTP status code
+ return unwrapAWSError(err)
+ }
+
+ s.uploaded = true
+
+ return nil
+}
+
+func (s *S3Object) ETag() string {
+ return ""
+}
+
+func (s *S3Object) Abort() {
+ s.Delete()
+}
+
+func (s *S3Object) Delete() {
+ if !s.uploaded {
+ return
+ }
+
+ session, err := setupS3Session(s.credentials, s.config)
+ if err != nil {
+ log.WithError(err).Error("error setting up S3 session in delete")
+ return
+ }
+
+ svc := s3.New(session)
+ input := &s3.DeleteObjectInput{
+ Bucket: aws.String(s.config.Bucket),
+ Key: aws.String(s.objectName),
+ }
+
+ // Note we can't use the request context because in a successful
+ // case, the original request has already completed.
+ deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
+ defer cancel()
+
+ _, err = svc.DeleteObjectWithContext(deleteCtx, input)
+ if err != nil {
+ log.WithError(err).Error("error deleting S3 object", err)
+ }
+}
+
+// This is needed until https://github.com/aws/aws-sdk-go/issues/2820 is closed.
+func unwrapAWSError(e error) error {
+ if awsErr, ok := e.(awserr.Error); ok {
+ return unwrapAWSError(awsErr.OrigErr())
+ }
+
+ return e
+}
diff --git a/workhorse/internal/upload/destination/objectstore/s3_object_test.go b/workhorse/internal/upload/destination/objectstore/s3_object_test.go
new file mode 100644
index 00000000000..b81b0ae2024
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/s3_object_test.go
@@ -0,0 +1,174 @@
+package objectstore_test
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
+)
+
+type failedReader struct {
+ io.Reader
+}
+
+func (r *failedReader) Read(p []byte) (int, error) {
+ origErr := fmt.Errorf("entity is too large")
+ return 0, awserr.New("Read", "read failed", origErr)
+}
+
+func TestS3ObjectUpload(t *testing.T) {
+ testCases := []struct {
+ encryption string
+ }{
+ {encryption: ""},
+ {encryption: s3.ServerSideEncryptionAes256},
+ {encryption: s3.ServerSideEncryptionAwsKms},
+ }
+
+ for _, tc := range testCases {
+ t.Run(fmt.Sprintf("encryption=%v", tc.encryption), func(t *testing.T) {
+ creds, config, sess, ts := test.SetupS3(t, tc.encryption)
+ defer ts.Close()
+
+ deadline := time.Now().Add(testTimeout)
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ objectName := filepath.Join(tmpDir, "s3-test-data")
+ ctx, cancel := context.WithCancel(context.Background())
+
+ object, err := objectstore.NewS3Object(objectName, creds, config)
+ require.NoError(t, err)
+
+ // copy data
+ n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
+
+ test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent)
+ test.CheckS3Metadata(t, sess, config, objectName)
+
+ cancel()
+
+ testhelper.Retry(t, 5*time.Second, func() error {
+ if test.S3ObjectDoesNotExist(t, sess, config, objectName) {
+ return nil
+ }
+
+ return fmt.Errorf("file is still present")
+ })
+ })
+ }
+}
+
+func TestConcurrentS3ObjectUpload(t *testing.T) {
+ creds, uploadsConfig, uploadsSession, uploadServer := test.SetupS3WithBucket(t, "uploads", "")
+ defer uploadServer.Close()
+
+ // This will return a separate S3 endpoint
+ _, artifactsConfig, artifactsSession, artifactsServer := test.SetupS3WithBucket(t, "artifacts", "")
+ defer artifactsServer.Close()
+
+ deadline := time.Now().Add(testTimeout)
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ var wg sync.WaitGroup
+
+ for i := 0; i < 4; i++ {
+ wg.Add(1)
+
+ go func(index int) {
+ var sess *session.Session
+ var config config.S3Config
+
+ if index%2 == 0 {
+ sess = uploadsSession
+ config = uploadsConfig
+ } else {
+ sess = artifactsSession
+ config = artifactsConfig
+ }
+
+ name := fmt.Sprintf("s3-test-data-%d", index)
+ objectName := filepath.Join(tmpDir, name)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ object, err := objectstore.NewS3Object(objectName, creds, config)
+ require.NoError(t, err)
+
+ // copy data
+ n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
+
+ test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent)
+ wg.Done()
+ }(i)
+ }
+
+ wg.Wait()
+}
+
+func TestS3ObjectUploadCancel(t *testing.T) {
+ creds, config, _, ts := test.SetupS3(t, "")
+ defer ts.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ deadline := time.Now().Add(testTimeout)
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ objectName := filepath.Join(tmpDir, "s3-test-data")
+
+ object, err := objectstore.NewS3Object(objectName, creds, config)
+
+ require.NoError(t, err)
+
+ // Cancel the transfer before the data has been copied to ensure
+ // we handle this gracefully.
+ cancel()
+
+ _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.Error(t, err)
+ require.Equal(t, "context canceled", err.Error())
+}
+
+func TestS3ObjectUploadLimitReached(t *testing.T) {
+ creds, config, _, ts := test.SetupS3(t, "")
+ defer ts.Close()
+
+ deadline := time.Now().Add(testTimeout)
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ objectName := filepath.Join(tmpDir, "s3-test-data")
+ object, err := objectstore.NewS3Object(objectName, creds, config)
+ require.NoError(t, err)
+
+ _, err = object.Consume(context.Background(), &failedReader{}, deadline)
+ require.Error(t, err)
+ require.Equal(t, "entity is too large", err.Error())
+}
diff --git a/workhorse/internal/upload/destination/objectstore/s3_session.go b/workhorse/internal/upload/destination/objectstore/s3_session.go
new file mode 100644
index 00000000000..a0c1f099145
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/s3_session.go
@@ -0,0 +1,94 @@
+package objectstore
+
+import (
+ "sync"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+)
+
+type s3Session struct {
+ session *session.Session
+ expiry time.Time
+}
+
+type s3SessionCache struct {
+ // An S3 session is cached by its input configuration (e.g. region,
+ // endpoint, path style, etc.), but the bucket is actually
+ // determined by the type of object to be uploaded (e.g. CI
+ // artifact, LFS, etc.) during runtime. In practice, we should only
+ // need one session per Workhorse process if we only allow one
+ // configuration for many different buckets. However, using a map
+ // indexed by the config avoids potential pitfalls in case the
+ // bucket configuration is supplied at startup or we need to support
+ // multiple S3 endpoints.
+ sessions map[config.S3Config]*s3Session
+ sync.Mutex
+}
+
+func (s *s3Session) isExpired() bool {
+ return time.Now().After(s.expiry)
+}
+
+func newS3SessionCache() *s3SessionCache {
+ return &s3SessionCache{sessions: make(map[config.S3Config]*s3Session)}
+}
+
+var (
+ // By default, it looks like IAM instance profiles may last 6 hours
+ // (via curl http://169.254.169.254/latest/meta-data/iam/security-credentials/<role_name>),
+ // but this may be configurable from anywhere for 15 minutes to 12
+ // hours. To be safe, refresh AWS sessions every 10 minutes.
+ sessionExpiration = time.Duration(10 * time.Minute)
+ sessionCache = newS3SessionCache()
+)
+
+// SetupS3Session initializes a new AWS S3 session and refreshes one if
+// necessary. As recommended in https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html,
+// sessions should be cached when possible. Sessions are safe to use
+// concurrently as long as the session isn't modified.
+func setupS3Session(s3Credentials config.S3Credentials, s3Config config.S3Config) (*session.Session, error) {
+ sessionCache.Lock()
+ defer sessionCache.Unlock()
+
+ if s, ok := sessionCache.sessions[s3Config]; ok && !s.isExpired() {
+ return s.session, nil
+ }
+
+ cfg := &aws.Config{
+ Region: aws.String(s3Config.Region),
+ S3ForcePathStyle: aws.Bool(s3Config.PathStyle),
+ }
+
+ // In case IAM profiles aren't being used, use the static credentials
+ if s3Credentials.AwsAccessKeyID != "" && s3Credentials.AwsSecretAccessKey != "" {
+ cfg.Credentials = credentials.NewStaticCredentials(s3Credentials.AwsAccessKeyID, s3Credentials.AwsSecretAccessKey, "")
+ }
+
+ if s3Config.Endpoint != "" {
+ cfg.Endpoint = aws.String(s3Config.Endpoint)
+ }
+
+ sess, err := session.NewSession(cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ sessionCache.sessions[s3Config] = &s3Session{
+ expiry: time.Now().Add(sessionExpiration),
+ session: sess,
+ }
+
+ return sess, nil
+}
+
+func ResetS3Session(s3Config config.S3Config) {
+ sessionCache.Lock()
+ defer sessionCache.Unlock()
+
+ delete(sessionCache.sessions, s3Config)
+}
diff --git a/workhorse/internal/upload/destination/objectstore/s3_session_test.go b/workhorse/internal/upload/destination/objectstore/s3_session_test.go
new file mode 100644
index 00000000000..5d57b4f9af8
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/s3_session_test.go
@@ -0,0 +1,57 @@
+package objectstore
+
+import (
+ "testing"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+)
+
+func TestS3SessionSetup(t *testing.T) {
+ credentials := config.S3Credentials{}
+ cfg := config.S3Config{Region: "us-west-1", PathStyle: true}
+
+ sess, err := setupS3Session(credentials, cfg)
+ require.NoError(t, err)
+
+ require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1")
+ require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle))
+
+ require.Equal(t, len(sessionCache.sessions), 1)
+ anotherConfig := cfg
+ _, err = setupS3Session(credentials, anotherConfig)
+ require.NoError(t, err)
+ require.Equal(t, len(sessionCache.sessions), 1)
+
+ ResetS3Session(cfg)
+}
+
+func TestS3SessionExpiry(t *testing.T) {
+ credentials := config.S3Credentials{}
+ cfg := config.S3Config{Region: "us-west-1", PathStyle: true}
+
+ sess, err := setupS3Session(credentials, cfg)
+ require.NoError(t, err)
+
+ require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1")
+ require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle))
+
+ firstSession, ok := sessionCache.sessions[cfg]
+ require.True(t, ok)
+ require.False(t, firstSession.isExpired())
+
+ firstSession.expiry = time.Now().Add(-1 * time.Second)
+ require.True(t, firstSession.isExpired())
+
+ _, err = setupS3Session(credentials, cfg)
+ require.NoError(t, err)
+
+ nextSession, ok := sessionCache.sessions[cfg]
+ require.True(t, ok)
+ require.False(t, nextSession.isExpired())
+
+ ResetS3Session(cfg)
+}
diff --git a/workhorse/internal/upload/destination/objectstore/test/consts.go b/workhorse/internal/upload/destination/objectstore/test/consts.go
new file mode 100644
index 00000000000..7a1bcc28d45
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/test/consts.go
@@ -0,0 +1,19 @@
+package test
+
+// Some useful const for testing purpose
+const (
+ // ObjectContent an example textual content
+ ObjectContent = "TEST OBJECT CONTENT"
+ // ObjectSize is the ObjectContent size
+ ObjectSize = int64(len(ObjectContent))
+ // Objectpath is an example remote object path (including bucket name)
+ ObjectPath = "/bucket/object"
+ // ObjectMD5 is ObjectContent MD5 hash
+ ObjectMD5 = "42d000eea026ee0760677e506189cb33"
+ // ObjectSHA1 is ObjectContent SHA1 hash
+ ObjectSHA1 = "173cfd58c6b60cb910f68a26cbb77e3fc5017a6d"
+ // ObjectSHA256 is ObjectContent SHA256 hash
+ ObjectSHA256 = "b0257e9e657ef19b15eed4fbba975bd5238d651977564035ef91cb45693647aa"
+ // ObjectSHA512 is ObjectContent SHA512 hash
+ ObjectSHA512 = "51af8197db2047f7894652daa7437927bf831d5aa63f1b0b7277c4800b06f5e3057251f0e4c2d344ca8c2daf1ffc08a28dd3b2f5fe0e316d3fd6c3af58c34b97"
+)
diff --git a/workhorse/internal/upload/destination/objectstore/test/gocloud_stub.go b/workhorse/internal/upload/destination/objectstore/test/gocloud_stub.go
new file mode 100644
index 00000000000..cf22075e407
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/test/gocloud_stub.go
@@ -0,0 +1,47 @@
+package test
+
+import (
+ "context"
+ "io/ioutil"
+ "net/url"
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gocloud.dev/blob"
+ "gocloud.dev/blob/fileblob"
+)
+
+type dirOpener struct {
+ tmpDir string
+}
+
+func (o *dirOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) {
+ return fileblob.OpenBucket(o.tmpDir, nil)
+}
+
+func SetupGoCloudFileBucket(t *testing.T, scheme string) (m *blob.URLMux, bucketDir string, cleanup func()) {
+ tmpDir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+
+ mux := new(blob.URLMux)
+ fake := &dirOpener{tmpDir: tmpDir}
+ mux.RegisterBucket(scheme, fake)
+ cleanup = func() {
+ os.RemoveAll(tmpDir)
+ }
+
+ return mux, tmpDir, cleanup
+}
+
+func GoCloudObjectExists(t *testing.T, bucketDir string, objectName string) {
+ bucket, err := fileblob.OpenBucket(bucketDir, nil)
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(context.Background()) // lint:allow context.Background
+ defer cancel()
+
+ exists, err := bucket.Exists(ctx, objectName)
+ require.NoError(t, err)
+ require.True(t, exists)
+}
diff --git a/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go
new file mode 100644
index 00000000000..d51a2de7456
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go
@@ -0,0 +1,278 @@
+package test
+
+import (
+ "crypto/md5"
+ "encoding/hex"
+ "encoding/xml"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "strconv"
+ "strings"
+ "sync"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
+)
+
+type partsEtagMap map[int]string
+
+// ObjectstoreStub is a testing implementation of ObjectStore.
+// Instead of storing objects it will just save md5sum.
+type ObjectstoreStub struct {
+ // bucket contains md5sum of uploaded objects
+ bucket map[string]string
+ // overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash
+ overwriteMD5 map[string]string
+ // multipart is a map of MultipartUploads
+ multipart map[string]partsEtagMap
+ // HTTP header sent along request
+ headers map[string]*http.Header
+
+ puts int
+ deletes int
+
+ m sync.Mutex
+}
+
+// StartObjectStore will start an ObjectStore stub
+func StartObjectStore() (*ObjectstoreStub, *httptest.Server) {
+ return StartObjectStoreWithCustomMD5(make(map[string]string))
+}
+
+// StartObjectStoreWithCustomMD5 will start an ObjectStore stub: md5Hashes contains overwrites for md5sum that should be return on PutObject
+func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStub, *httptest.Server) {
+ os := &ObjectstoreStub{
+ bucket: make(map[string]string),
+ multipart: make(map[string]partsEtagMap),
+ overwriteMD5: make(map[string]string),
+ headers: make(map[string]*http.Header),
+ }
+
+ for k, v := range md5Hashes {
+ os.overwriteMD5[k] = v
+ }
+
+ return os, httptest.NewServer(os)
+}
+
+// PutsCnt counts PutObject invocations
+func (o *ObjectstoreStub) PutsCnt() int {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ return o.puts
+}
+
+// DeletesCnt counts DeleteObject invocation of a valid object
+func (o *ObjectstoreStub) DeletesCnt() int {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ return o.deletes
+}
+
+// GetObjectMD5 return the calculated MD5 of the object uploaded to path
+// it will return an empty string if no object has been uploaded on such path
+func (o *ObjectstoreStub) GetObjectMD5(path string) string {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ return o.bucket[path]
+}
+
+// GetHeader returns a given HTTP header of the object uploaded to the path
+func (o *ObjectstoreStub) GetHeader(path, key string) string {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ if val, ok := o.headers[path]; ok {
+ return val.Get(key)
+ }
+
+ return ""
+}
+
+// InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path
+// It will return an error if a MultipartUpload is already in progress on that path
+// InitiateMultipartUpload is only used during test setup.
+// Workhorse's production code does not know how to initiate a multipart upload.
+//
+// Real S3 multipart uploads are more complicated than what we do here,
+// but this is enough to verify that workhorse's production code behaves as intended.
+func (o *ObjectstoreStub) InitiateMultipartUpload(path string) error {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ if o.multipart[path] != nil {
+ return fmt.Errorf("MultipartUpload for %q already in progress", path)
+ }
+
+ o.multipart[path] = make(partsEtagMap)
+ return nil
+}
+
+// IsMultipartUpload check if the given path has a MultipartUpload in progress
+func (o *ObjectstoreStub) IsMultipartUpload(path string) bool {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ return o.isMultipartUpload(path)
+}
+
+// isMultipartUpload is the lock free version of IsMultipartUpload
+func (o *ObjectstoreStub) isMultipartUpload(path string) bool {
+ return o.multipart[path] != nil
+}
+
+func (o *ObjectstoreStub) removeObject(w http.ResponseWriter, r *http.Request) {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ objectPath := r.URL.Path
+ if o.isMultipartUpload(objectPath) {
+ o.deletes++
+ delete(o.multipart, objectPath)
+
+ w.WriteHeader(200)
+ } else if _, ok := o.bucket[objectPath]; ok {
+ o.deletes++
+ delete(o.bucket, objectPath)
+
+ w.WriteHeader(200)
+ } else {
+ w.WriteHeader(404)
+ }
+}
+
+func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ objectPath := r.URL.Path
+
+ etag, overwritten := o.overwriteMD5[objectPath]
+ if !overwritten {
+ hasher := md5.New()
+ io.Copy(hasher, r.Body)
+
+ checksum := hasher.Sum(nil)
+ etag = hex.EncodeToString(checksum)
+ }
+
+ o.headers[objectPath] = &r.Header
+ o.puts++
+ if o.isMultipartUpload(objectPath) {
+ pNumber := r.URL.Query().Get("partNumber")
+ idx, err := strconv.Atoi(pNumber)
+ if err != nil {
+ http.Error(w, fmt.Sprintf("malformed partNumber: %v", err), 400)
+ return
+ }
+
+ o.multipart[objectPath][idx] = etag
+ } else {
+ o.bucket[objectPath] = etag
+ }
+
+ w.Header().Set("ETag", etag)
+ w.WriteHeader(200)
+}
+
+func MultipartUploadInternalError() *objectstore.CompleteMultipartUploadError {
+ return &objectstore.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"}
+}
+
+func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http.Request) {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ objectPath := r.URL.Path
+
+ multipart := o.multipart[objectPath]
+ if multipart == nil {
+ http.Error(w, "Unknown MultipartUpload", 404)
+ return
+ }
+
+ buf, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w, err.Error(), 500)
+ return
+ }
+
+ var msg objectstore.CompleteMultipartUpload
+ err = xml.Unmarshal(buf, &msg)
+ if err != nil {
+ http.Error(w, err.Error(), 400)
+ return
+ }
+
+ for _, part := range msg.Part {
+ etag := multipart[part.PartNumber]
+ if etag != part.ETag {
+ msg := fmt.Sprintf("ETag mismatch on part %d. Expected %q got %q", part.PartNumber, etag, part.ETag)
+ http.Error(w, msg, 400)
+ return
+ }
+ }
+
+ etag, overwritten := o.overwriteMD5[objectPath]
+ if !overwritten {
+ etag = "CompleteMultipartUploadETag"
+ }
+
+ o.bucket[objectPath] = etag
+ delete(o.multipart, objectPath)
+
+ w.Header().Set("ETag", etag)
+ split := strings.SplitN(objectPath[1:], "/", 2)
+ if len(split) < 2 {
+ encodeXMLAnswer(w, MultipartUploadInternalError())
+ return
+ }
+
+ bucket := split[0]
+ key := split[1]
+ answer := objectstore.CompleteMultipartUploadResult{
+ Location: r.URL.String(),
+ Bucket: bucket,
+ Key: key,
+ ETag: etag,
+ }
+ encodeXMLAnswer(w, answer)
+}
+
+func encodeXMLAnswer(w http.ResponseWriter, answer interface{}) {
+ w.Header().Set("Content-Type", "text/xml")
+
+ enc := xml.NewEncoder(w)
+ if err := enc.Encode(answer); err != nil {
+ http.Error(w, err.Error(), 500)
+ }
+}
+
+func (o *ObjectstoreStub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Body != nil {
+ defer r.Body.Close()
+ }
+
+ fmt.Println("ObjectStore Stub:", r.Method, r.URL.String())
+
+ if r.URL.Path == "" {
+ http.Error(w, "No path provided", 404)
+ return
+ }
+
+ switch r.Method {
+ case "DELETE":
+ o.removeObject(w, r)
+ case "PUT":
+ o.putObject(w, r)
+ case "POST":
+ o.completeMultipartUpload(w, r)
+ default:
+ w.WriteHeader(404)
+ }
+}
diff --git a/workhorse/internal/upload/destination/objectstore/test/objectstore_stub_test.go b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub_test.go
new file mode 100644
index 00000000000..8c0d52a2d79
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub_test.go
@@ -0,0 +1,167 @@
+package test
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func doRequest(method, url string, body io.Reader) error {
+ req, err := http.NewRequest(method, url, body)
+ if err != nil {
+ return err
+ }
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+
+ return resp.Body.Close()
+}
+
+func TestObjectStoreStub(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ require.Equal(t, 0, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+
+ objectURL := ts.URL + ObjectPath
+
+ require.NoError(t, doRequest(http.MethodPut, objectURL, strings.NewReader(ObjectContent)))
+
+ require.Equal(t, 1, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+ require.Equal(t, ObjectMD5, stub.GetObjectMD5(ObjectPath))
+
+ require.NoError(t, doRequest(http.MethodDelete, objectURL, nil))
+
+ require.Equal(t, 1, stub.PutsCnt())
+ require.Equal(t, 1, stub.DeletesCnt())
+}
+
+func TestObjectStoreStubDelete404(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ objectURL := ts.URL + ObjectPath
+
+ req, err := http.NewRequest(http.MethodDelete, objectURL, nil)
+ require.NoError(t, err)
+
+ resp, err := http.DefaultClient.Do(req)
+ require.NoError(t, err)
+ defer resp.Body.Close()
+ require.Equal(t, 404, resp.StatusCode)
+
+ require.Equal(t, 0, stub.DeletesCnt())
+}
+
+func TestObjectStoreInitiateMultipartUpload(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ path := "/my-multipart"
+ err := stub.InitiateMultipartUpload(path)
+ require.NoError(t, err)
+
+ err = stub.InitiateMultipartUpload(path)
+ require.Error(t, err, "second attempt to open the same MultipartUpload")
+}
+
+func TestObjectStoreCompleteMultipartUpload(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ objectURL := ts.URL + ObjectPath
+ parts := []struct {
+ number int
+ content string
+ contentMD5 string
+ }{
+ {
+ number: 1,
+ content: "first part",
+ contentMD5: "550cf6b6e60f65a0e3104a26e70fea42",
+ }, {
+ number: 2,
+ content: "second part",
+ contentMD5: "920b914bca0a70780b40881b8f376135",
+ },
+ }
+
+ stub.InitiateMultipartUpload(ObjectPath)
+
+ require.True(t, stub.IsMultipartUpload(ObjectPath))
+ require.Equal(t, 0, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+
+ // Workhorse knows nothing about S3 MultipartUpload, it receives some URLs
+ // from GitLab-rails and PUTs chunk of data to each of them.
+ // Then it completes the upload with a final POST
+ partPutURLs := []string{
+ fmt.Sprintf("%s?partNumber=%d", objectURL, 1),
+ fmt.Sprintf("%s?partNumber=%d", objectURL, 2),
+ }
+ completePostURL := objectURL
+
+ for i, partPutURL := range partPutURLs {
+ part := parts[i]
+
+ require.NoError(t, doRequest(http.MethodPut, partPutURL, strings.NewReader(part.content)))
+
+ require.Equal(t, i+1, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+ require.Equal(t, part.contentMD5, stub.multipart[ObjectPath][part.number], "Part %d was not uploaded into ObjectStorage", part.number)
+ require.Empty(t, stub.GetObjectMD5(ObjectPath), "Part %d was mistakenly uploaded as a single object", part.number)
+ require.True(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted")
+ }
+
+ completeBody := fmt.Sprintf(`<CompleteMultipartUpload>
+ <Part>
+ <PartNumber>1</PartNumber>
+ <ETag>%s</ETag>
+ </Part>
+ <Part>
+ <PartNumber>2</PartNumber>
+ <ETag>%s</ETag>
+ </Part>
+ </CompleteMultipartUpload>`, parts[0].contentMD5, parts[1].contentMD5)
+ require.NoError(t, doRequest(http.MethodPost, completePostURL, strings.NewReader(completeBody)))
+
+ require.Equal(t, len(parts), stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+ require.False(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload is still in progress")
+}
+
+func TestObjectStoreAbortMultipartUpload(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ stub.InitiateMultipartUpload(ObjectPath)
+
+ require.True(t, stub.IsMultipartUpload(ObjectPath))
+ require.Equal(t, 0, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+
+ objectURL := ts.URL + ObjectPath
+ require.NoError(t, doRequest(http.MethodPut, fmt.Sprintf("%s?partNumber=%d", objectURL, 1), strings.NewReader(ObjectContent)))
+
+ require.Equal(t, 1, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+ require.Equal(t, ObjectMD5, stub.multipart[ObjectPath][1], "Part was not uploaded into ObjectStorage")
+ require.Empty(t, stub.GetObjectMD5(ObjectPath), "Part was mistakenly uploaded as a single object")
+ require.True(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted")
+
+ require.NoError(t, doRequest(http.MethodDelete, objectURL, nil))
+
+ require.Equal(t, 1, stub.PutsCnt())
+ require.Equal(t, 1, stub.DeletesCnt())
+ require.Empty(t, stub.GetObjectMD5(ObjectPath), "MultiUpload has been completed")
+ require.False(t, stub.IsMultipartUpload(ObjectPath), "MultiUpload is still in progress")
+}
diff --git a/workhorse/internal/upload/destination/objectstore/test/s3_stub.go b/workhorse/internal/upload/destination/objectstore/test/s3_stub.go
new file mode 100644
index 00000000000..6b83426b852
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/test/s3_stub.go
@@ -0,0 +1,142 @@
+package test
+
+import (
+ "io/ioutil"
+ "net/http/httptest"
+ "os"
+ "strings"
+ "testing"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+
+ "github.com/johannesboyne/gofakes3"
+ "github.com/johannesboyne/gofakes3/backend/s3mem"
+)
+
+func SetupS3(t *testing.T, encryption string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) {
+ return SetupS3WithBucket(t, "test-bucket", encryption)
+}
+
+func SetupS3WithBucket(t *testing.T, bucket string, encryption string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) {
+ backend := s3mem.New()
+ faker := gofakes3.New(backend)
+ ts := httptest.NewServer(faker.Server())
+
+ creds := config.S3Credentials{
+ AwsAccessKeyID: "YOUR-ACCESSKEYID",
+ AwsSecretAccessKey: "YOUR-SECRETACCESSKEY",
+ }
+
+ config := config.S3Config{
+ Bucket: bucket,
+ Endpoint: ts.URL,
+ Region: "eu-central-1",
+ PathStyle: true,
+ }
+
+ if encryption != "" {
+ config.ServerSideEncryption = encryption
+
+ if encryption == s3.ServerSideEncryptionAwsKms {
+ config.SSEKMSKeyID = "arn:aws:1234"
+ }
+ }
+
+ sess, err := session.NewSession(&aws.Config{
+ Credentials: credentials.NewStaticCredentials(creds.AwsAccessKeyID, creds.AwsSecretAccessKey, ""),
+ Endpoint: aws.String(ts.URL),
+ Region: aws.String(config.Region),
+ DisableSSL: aws.Bool(true),
+ S3ForcePathStyle: aws.Bool(true),
+ })
+ require.NoError(t, err)
+
+ // Create S3 service client
+ svc := s3.New(sess)
+
+ _, err = svc.CreateBucket(&s3.CreateBucketInput{
+ Bucket: aws.String(bucket),
+ })
+ require.NoError(t, err)
+
+ return creds, config, sess, ts
+}
+
+// S3ObjectExists will fail the test if the file does not exist.
+func S3ObjectExists(t *testing.T, sess *session.Session, config config.S3Config, objectName string, expectedBytes string) {
+ downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) {
+ require.NoError(t, err)
+ require.Equal(t, int64(len(expectedBytes)), numBytes)
+
+ output, err := ioutil.ReadFile(tmpfile.Name())
+ require.NoError(t, err)
+
+ require.Equal(t, []byte(expectedBytes), output)
+ })
+}
+
+func CheckS3Metadata(t *testing.T, sess *session.Session, config config.S3Config, objectName string) {
+ // In a real S3 provider, s3crypto.NewDecryptionClient should probably be used
+ svc := s3.New(sess)
+ result, err := svc.GetObject(&s3.GetObjectInput{
+ Bucket: aws.String(config.Bucket),
+ Key: aws.String(objectName),
+ })
+ require.NoError(t, err)
+
+ if config.ServerSideEncryption != "" {
+ require.Equal(t, aws.String(config.ServerSideEncryption), result.ServerSideEncryption)
+
+ if config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms {
+ require.Equal(t, aws.String(config.SSEKMSKeyID), result.SSEKMSKeyId)
+ } else {
+ require.Nil(t, result.SSEKMSKeyId)
+ }
+ } else {
+ require.Nil(t, result.ServerSideEncryption)
+ require.Nil(t, result.SSEKMSKeyId)
+ }
+}
+
+// S3ObjectDoesNotExist returns true if the object has been deleted,
+// false otherwise. The return signature is different from
+// S3ObjectExists because deletion may need to be retried since deferred
+// clean up callsinternal/objectstore/test/s3_stub.go may cause the actual deletion to happen after the
+// initial check.
+func S3ObjectDoesNotExist(t *testing.T, sess *session.Session, config config.S3Config, objectName string) bool {
+ deleted := false
+
+ downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) {
+ if err != nil && strings.Contains(err.Error(), "NoSuchKey") {
+ deleted = true
+ }
+ })
+
+ return deleted
+}
+
+func downloadObject(t *testing.T, sess *session.Session, config config.S3Config, objectName string, handler func(tmpfile *os.File, numBytes int64, err error)) {
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ tmpfile, err := ioutil.TempFile(tmpDir, "s3-output")
+ require.NoError(t, err)
+ defer os.Remove(tmpfile.Name())
+
+ downloadSvc := s3manager.NewDownloader(sess)
+ numBytes, err := downloadSvc.Download(tmpfile, &s3.GetObjectInput{
+ Bucket: aws.String(config.Bucket),
+ Key: aws.String(objectName),
+ })
+
+ handler(tmpfile, numBytes, err)
+}
diff --git a/workhorse/internal/upload/destination/objectstore/upload_strategy.go b/workhorse/internal/upload/destination/objectstore/upload_strategy.go
new file mode 100644
index 00000000000..5707ba5f24e
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/upload_strategy.go
@@ -0,0 +1,46 @@
+package objectstore
+
+import (
+ "context"
+ "io"
+ "net/http"
+
+ "gitlab.com/gitlab-org/labkit/log"
+ "gitlab.com/gitlab-org/labkit/mask"
+)
+
+type uploadStrategy interface {
+ Upload(ctx context.Context, r io.Reader) error
+ ETag() string
+ Abort()
+ Delete()
+}
+
+func deleteURL(url string) {
+ if url == "" {
+ return
+ }
+
+ req, err := http.NewRequest("DELETE", url, nil)
+ if err != nil {
+ log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
+ return
+ }
+ // TODO: consider adding the context to the outgoing request for better instrumentation
+
+ // here we are not using u.ctx because we must perform cleanup regardless of parent context
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
+ return
+ }
+ resp.Body.Close()
+}
+
+func extractETag(rawETag string) string {
+ if rawETag != "" && rawETag[0] == '"' {
+ rawETag = rawETag[1 : len(rawETag)-1]
+ }
+
+ return rawETag
+}
diff --git a/workhorse/internal/upload/destination/objectstore/uploader.go b/workhorse/internal/upload/destination/objectstore/uploader.go
new file mode 100644
index 00000000000..aedfbe55ead
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/uploader.go
@@ -0,0 +1,115 @@
+package objectstore
+
+import (
+ "context"
+ "crypto/md5"
+ "encoding/hex"
+ "fmt"
+ "hash"
+ "io"
+ "strings"
+ "time"
+
+ "gitlab.com/gitlab-org/labkit/log"
+)
+
+// uploader consumes an io.Reader and uploads it using a pluggable uploadStrategy.
+type uploader struct {
+ strategy uploadStrategy
+
+ // In the case of S3 uploads, we have a multipart upload which
+ // instantiates uploads for the individual parts. We don't want to
+ // increment metrics for the individual parts, so that is why we have
+ // this boolean flag.
+ metrics bool
+
+ // With S3 we compare the MD5 of the data we sent with the ETag returned
+ // by the object storage server.
+ checkETag bool
+}
+
+func newUploader(strategy uploadStrategy) *uploader {
+ return &uploader{strategy: strategy, metrics: true}
+}
+
+func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader {
+ return &uploader{strategy: strategy, metrics: metrics, checkETag: true}
+}
+
+func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) }
+
+// Consume reads the reader until it reaches EOF or an error. It spawns a
+// goroutine that waits for outerCtx to be done, after which the remote
+// file is deleted. The deadline applies to the upload performed inside
+// Consume, not to outerCtx.
+func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) {
+ if u.metrics {
+ objectStorageUploadsOpen.Inc()
+ defer func(started time.Time) {
+ objectStorageUploadsOpen.Dec()
+ objectStorageUploadTime.Observe(time.Since(started).Seconds())
+ if err != nil {
+ objectStorageUploadRequestsRequestFailed.Inc()
+ }
+ }(time.Now())
+ }
+
+ defer func() {
+ // We do this mainly to abort S3 multipart uploads: it is not enough to
+ // "delete" them.
+ if err != nil {
+ u.strategy.Abort()
+ }
+ }()
+
+ go func() {
+ // Once gitlab-rails is done handling the request, we are supposed to
+ // delete the upload from its temporary location.
+ <-outerCtx.Done()
+ u.strategy.Delete()
+ }()
+
+ uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline)
+ defer cancelFn()
+
+ var hasher hash.Hash
+ if u.checkETag {
+ hasher = md5.New()
+ reader = io.TeeReader(reader, hasher)
+ }
+
+ cr := &countReader{r: reader}
+ if err := u.strategy.Upload(uploadCtx, cr); err != nil {
+ return cr.n, err
+ }
+
+ if u.checkETag {
+ if err := compareMD5(hexString(hasher), u.strategy.ETag()); err != nil {
+ log.ContextLogger(uploadCtx).WithError(err).Error("error comparing MD5 checksum")
+ return cr.n, err
+ }
+ }
+
+ objectStorageUploadBytes.Add(float64(cr.n))
+
+ return cr.n, nil
+}
+
+func compareMD5(local, remote string) error {
+ if !strings.EqualFold(local, remote) {
+ return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote)
+ }
+
+ return nil
+}
+
+type countReader struct {
+ r io.Reader
+ n int64
+}
+
+func (cr *countReader) Read(p []byte) (int, error) {
+ nRead, err := cr.r.Read(p)
+ cr.n += int64(nRead)
+ return nRead, err
+}
diff --git a/workhorse/internal/upload/destination/reader.go b/workhorse/internal/upload/destination/reader.go
new file mode 100644
index 00000000000..925a9468e14
--- /dev/null
+++ b/workhorse/internal/upload/destination/reader.go
@@ -0,0 +1,17 @@
+package destination
+
+import "io"
+
+type hardLimitReader struct {
+ r io.Reader
+ n int64
+}
+
+func (h *hardLimitReader) Read(p []byte) (int, error) {
+ nRead, err := h.r.Read(p)
+ h.n -= int64(nRead)
+ if h.n < 0 {
+ err = ErrEntityTooLarge
+ }
+ return nRead, err
+}
diff --git a/workhorse/internal/upload/destination/reader_test.go b/workhorse/internal/upload/destination/reader_test.go
new file mode 100644
index 00000000000..a26f7746a13
--- /dev/null
+++ b/workhorse/internal/upload/destination/reader_test.go
@@ -0,0 +1,46 @@
+package destination
+
+import (
+ "fmt"
+ "io/ioutil"
+ "strings"
+ "testing"
+ "testing/iotest"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestHardLimitReader(t *testing.T) {
+ const text = "hello world"
+ r := iotest.OneByteReader(
+ &hardLimitReader{
+ r: strings.NewReader(text),
+ n: int64(len(text)),
+ },
+ )
+
+ out, err := ioutil.ReadAll(r)
+ require.NoError(t, err)
+ require.Equal(t, text, string(out))
+}
+
+func TestHardLimitReaderFail(t *testing.T) {
+ const text = "hello world"
+
+ for bufSize := len(text) / 2; bufSize < len(text)*2; bufSize++ {
+ t.Run(fmt.Sprintf("bufsize:%d", bufSize), func(t *testing.T) {
+ r := &hardLimitReader{
+ r: iotest.DataErrReader(strings.NewReader(text)),
+ n: int64(len(text)) - 1,
+ }
+ buf := make([]byte, bufSize)
+
+ var err error
+ for i := 0; err == nil && i < 1000; i++ {
+ _, err = r.Read(buf)
+ }
+
+ require.Equal(t, ErrEntityTooLarge, err)
+ })
+ }
+}
diff --git a/workhorse/internal/upload/destination/upload_opts.go b/workhorse/internal/upload/destination/upload_opts.go
new file mode 100644
index 00000000000..750a79d7bc2
--- /dev/null
+++ b/workhorse/internal/upload/destination/upload_opts.go
@@ -0,0 +1,171 @@
+package destination
+
+import (
+ "errors"
+ "strings"
+ "time"
+
+ "gocloud.dev/blob"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+)
+
+// DefaultObjectStoreTimeout is the timeout for ObjectStore upload operation
+const DefaultObjectStoreTimeout = 4 * time.Hour
+
+type ObjectStorageConfig struct {
+ Provider string
+
+ S3Credentials config.S3Credentials
+ S3Config config.S3Config
+
+ // GoCloud mux that maps azureblob:// and future URLs (e.g. s3://, gcs://, etc.) to a handler
+ URLMux *blob.URLMux
+
+ // Azure credentials are registered at startup in the GoCloud URLMux, so only the container name is needed
+ GoCloudConfig config.GoCloudConfig
+}
+
+// UploadOpts represents all the options available for saving a file to object store
+type UploadOpts struct {
+ // TempFilePrefix is the prefix used to create temporary local file
+ TempFilePrefix string
+ // LocalTempPath is the directory where to write a local copy of the file
+ LocalTempPath string
+ // RemoteID is the remote ObjectID provided by GitLab
+ RemoteID string
+ // RemoteURL is the final URL of the file
+ RemoteURL string
+ // PresignedPut is a presigned S3 PutObject compatible URL
+ PresignedPut string
+ // PresignedDelete is a presigned S3 DeleteObject compatible URL.
+ PresignedDelete string
+ // HTTP headers to be sent along with PUT request
+ PutHeaders map[string]string
+ // Whether to ignore Rails pre-signed URLs and have Workhorse directly access object storage provider
+ UseWorkhorseClient bool
+ // If UseWorkhorseClient is true, this is the temporary object name to store the file
+ RemoteTempObjectID string
+ // Workhorse object storage client (e.g. S3) parameters
+ ObjectStorageConfig ObjectStorageConfig
+ // Deadline it the S3 operation deadline, the upload will be aborted if not completed in time
+ Deadline time.Time
+ // The maximum accepted size in bytes of the upload
+ MaximumSize int64
+
+ //MultipartUpload parameters
+ // PartSize is the exact size of each uploaded part. Only the last one can be smaller
+ PartSize int64
+ // PresignedParts contains the presigned URLs for each part
+ PresignedParts []string
+ // PresignedCompleteMultipart is a presigned URL for CompleteMulipartUpload
+ PresignedCompleteMultipart string
+ // PresignedAbortMultipart is a presigned URL for AbortMultipartUpload
+ PresignedAbortMultipart string
+}
+
+// UseWorkhorseClientEnabled checks if the options require direct access to object storage
+func (s *UploadOpts) UseWorkhorseClientEnabled() bool {
+ return s.UseWorkhorseClient && s.ObjectStorageConfig.IsValid() && s.RemoteTempObjectID != ""
+}
+
+// IsLocal checks if the options require the writing of the file on disk
+func (s *UploadOpts) IsLocal() bool {
+ return s.LocalTempPath != ""
+}
+
+// IsMultipart checks if the options requires a Multipart upload
+func (s *UploadOpts) IsMultipart() bool {
+ return s.PartSize > 0
+}
+
+// GetOpts converts GitLab api.Response to a proper UploadOpts
+func GetOpts(apiResponse *api.Response) (*UploadOpts, error) {
+ timeout := time.Duration(apiResponse.RemoteObject.Timeout) * time.Second
+ if timeout == 0 {
+ timeout = DefaultObjectStoreTimeout
+ }
+
+ opts := UploadOpts{
+ LocalTempPath: apiResponse.TempPath,
+ RemoteID: apiResponse.RemoteObject.ID,
+ RemoteURL: apiResponse.RemoteObject.GetURL,
+ PresignedPut: apiResponse.RemoteObject.StoreURL,
+ PresignedDelete: apiResponse.RemoteObject.DeleteURL,
+ PutHeaders: apiResponse.RemoteObject.PutHeaders,
+ UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient,
+ RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID,
+ Deadline: time.Now().Add(timeout),
+ MaximumSize: apiResponse.MaximumSize,
+ }
+
+ if opts.LocalTempPath != "" && opts.RemoteID != "" {
+ return nil, errors.New("API response has both TempPath and RemoteObject")
+ }
+
+ if opts.LocalTempPath == "" && opts.RemoteID == "" {
+ return nil, errors.New("API response has neither TempPath nor RemoteObject")
+ }
+
+ objectStorageParams := apiResponse.RemoteObject.ObjectStorage
+ if opts.UseWorkhorseClient && objectStorageParams != nil {
+ opts.ObjectStorageConfig.Provider = objectStorageParams.Provider
+ opts.ObjectStorageConfig.S3Config = objectStorageParams.S3Config
+ opts.ObjectStorageConfig.GoCloudConfig = objectStorageParams.GoCloudConfig
+ }
+
+ // Backwards compatibility to ensure API servers that do not include the
+ // CustomPutHeaders flag will default to the original content type.
+ if !apiResponse.RemoteObject.CustomPutHeaders {
+ opts.PutHeaders = make(map[string]string)
+ opts.PutHeaders["Content-Type"] = "application/octet-stream"
+ }
+
+ if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil {
+ opts.PartSize = multiParams.PartSize
+ opts.PresignedCompleteMultipart = multiParams.CompleteURL
+ opts.PresignedAbortMultipart = multiParams.AbortURL
+ opts.PresignedParts = append([]string(nil), multiParams.PartURLs...)
+ }
+
+ return &opts, nil
+}
+
+func (c *ObjectStorageConfig) IsAWS() bool {
+ return strings.EqualFold(c.Provider, "AWS") || strings.EqualFold(c.Provider, "S3")
+}
+
+func (c *ObjectStorageConfig) IsAzure() bool {
+ return strings.EqualFold(c.Provider, "AzureRM")
+}
+
+func (c *ObjectStorageConfig) IsGoCloud() bool {
+ return c.GoCloudConfig.URL != ""
+}
+
+func (c *ObjectStorageConfig) IsValid() bool {
+ if c.IsAWS() {
+ return c.S3Config.Bucket != "" && c.s3CredentialsValid()
+ } else if c.IsGoCloud() {
+ // We could parse and validate the URL, but GoCloud providers
+ // such as AzureRM don't have a fallback to normal HTTP, so we
+ // always want to try the GoCloud path if there is a URL.
+ return true
+ }
+
+ return false
+}
+
+func (c *ObjectStorageConfig) s3CredentialsValid() bool {
+ // We need to be able to distinguish between two cases of AWS access:
+ // 1. AWS access via key and secret, but credentials not configured in Workhorse
+ // 2. IAM instance profiles used
+ if c.S3Config.UseIamProfile {
+ return true
+ } else if c.S3Credentials.AwsAccessKeyID != "" && c.S3Credentials.AwsSecretAccessKey != "" {
+ return true
+ }
+
+ return false
+}
diff --git a/workhorse/internal/upload/destination/upload_opts_test.go b/workhorse/internal/upload/destination/upload_opts_test.go
new file mode 100644
index 00000000000..fde726c985d
--- /dev/null
+++ b/workhorse/internal/upload/destination/upload_opts_test.go
@@ -0,0 +1,342 @@
+package destination_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
+)
+
+func TestUploadOptsLocalAndRemote(t *testing.T) {
+ tests := []struct {
+ name string
+ localTempPath string
+ presignedPut string
+ partSize int64
+ isLocal bool
+ isRemote bool
+ isMultipart bool
+ }{
+ {
+ name: "Only LocalTempPath",
+ localTempPath: "/tmp",
+ isLocal: true,
+ },
+ {
+ name: "No paths",
+ },
+ {
+ name: "Only remoteUrl",
+ presignedPut: "http://example.com",
+ },
+ {
+ name: "Multipart",
+ partSize: 10,
+ isMultipart: true,
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ opts := destination.UploadOpts{
+ LocalTempPath: test.localTempPath,
+ PresignedPut: test.presignedPut,
+ PartSize: test.partSize,
+ }
+
+ require.Equal(t, test.isLocal, opts.IsLocal(), "IsLocal() mismatch")
+ require.Equal(t, test.isMultipart, opts.IsMultipart(), "IsMultipart() mismatch")
+ })
+ }
+}
+
+func TestGetOpts(t *testing.T) {
+ tests := []struct {
+ name string
+ multipart *api.MultipartUploadParams
+ customPutHeaders bool
+ putHeaders map[string]string
+ }{
+ {
+ name: "Single upload",
+ },
+ {
+ name: "Multipart upload",
+ multipart: &api.MultipartUploadParams{
+ PartSize: 10,
+ CompleteURL: "http://complete",
+ AbortURL: "http://abort",
+ PartURLs: []string{"http://part1", "http://part2"},
+ },
+ },
+ {
+ name: "Single upload with custom content type",
+ customPutHeaders: true,
+ putHeaders: map[string]string{"Content-Type": "image/jpeg"},
+ }, {
+ name: "Multipart upload with custom content type",
+ multipart: &api.MultipartUploadParams{
+ PartSize: 10,
+ CompleteURL: "http://complete",
+ AbortURL: "http://abort",
+ PartURLs: []string{"http://part1", "http://part2"},
+ },
+ customPutHeaders: true,
+ putHeaders: map[string]string{"Content-Type": "image/jpeg"},
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ apiResponse := &api.Response{
+ RemoteObject: api.RemoteObject{
+ Timeout: 10,
+ ID: "id",
+ GetURL: "http://get",
+ StoreURL: "http://store",
+ DeleteURL: "http://delete",
+ MultipartUpload: test.multipart,
+ CustomPutHeaders: test.customPutHeaders,
+ PutHeaders: test.putHeaders,
+ },
+ }
+ deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
+ opts, err := destination.GetOpts(apiResponse)
+ require.NoError(t, err)
+
+ require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
+ require.WithinDuration(t, deadline, opts.Deadline, time.Second)
+ require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
+ require.Equal(t, apiResponse.RemoteObject.GetURL, opts.RemoteURL)
+ require.Equal(t, apiResponse.RemoteObject.StoreURL, opts.PresignedPut)
+ require.Equal(t, apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete)
+ if test.customPutHeaders {
+ require.Equal(t, opts.PutHeaders, apiResponse.RemoteObject.PutHeaders)
+ } else {
+ require.Equal(t, opts.PutHeaders, map[string]string{"Content-Type": "application/octet-stream"})
+ }
+
+ if test.multipart == nil {
+ require.False(t, opts.IsMultipart())
+ require.Empty(t, opts.PresignedCompleteMultipart)
+ require.Empty(t, opts.PresignedAbortMultipart)
+ require.Zero(t, opts.PartSize)
+ require.Empty(t, opts.PresignedParts)
+ } else {
+ require.True(t, opts.IsMultipart())
+ require.Equal(t, test.multipart.CompleteURL, opts.PresignedCompleteMultipart)
+ require.Equal(t, test.multipart.AbortURL, opts.PresignedAbortMultipart)
+ require.Equal(t, test.multipart.PartSize, opts.PartSize)
+ require.Equal(t, test.multipart.PartURLs, opts.PresignedParts)
+ }
+ })
+ }
+}
+
+func TestGetOptsFail(t *testing.T) {
+ testCases := []struct {
+ desc string
+ in *api.Response
+ }{
+ {
+ desc: "neither local nor remote",
+ in: &api.Response{},
+ },
+ {
+ desc: "both local and remote",
+ in: &api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}},
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ _, err := destination.GetOpts(tc.in)
+ require.Error(t, err, "expect input to be rejected")
+ })
+ }
+}
+
+func TestGetOptsDefaultTimeout(t *testing.T) {
+ deadline := time.Now().Add(destination.DefaultObjectStoreTimeout)
+ opts, err := destination.GetOpts(&api.Response{TempPath: "/foo/bar"})
+ require.NoError(t, err)
+
+ require.WithinDuration(t, deadline, opts.Deadline, time.Minute)
+}
+
+func TestUseWorkhorseClientEnabled(t *testing.T) {
+ cfg := destination.ObjectStorageConfig{
+ Provider: "AWS",
+ S3Config: config.S3Config{
+ Bucket: "test-bucket",
+ Region: "test-region",
+ },
+ S3Credentials: config.S3Credentials{
+ AwsAccessKeyID: "test-key",
+ AwsSecretAccessKey: "test-secret",
+ },
+ }
+
+ missingCfg := cfg
+ missingCfg.S3Credentials = config.S3Credentials{}
+
+ iamConfig := missingCfg
+ iamConfig.S3Config.UseIamProfile = true
+
+ missingRegion := cfg
+ missingRegion.S3Config.Region = ""
+
+ tests := []struct {
+ name string
+ UseWorkhorseClient bool
+ remoteTempObjectID string
+ objectStorageConfig destination.ObjectStorageConfig
+ expected bool
+ }{
+ {
+ name: "all direct access settings used",
+ UseWorkhorseClient: true,
+ remoteTempObjectID: "test-object",
+ objectStorageConfig: cfg,
+ expected: true,
+ },
+ {
+ name: "missing AWS credentials",
+ UseWorkhorseClient: true,
+ remoteTempObjectID: "test-object",
+ objectStorageConfig: missingCfg,
+ expected: false,
+ },
+ {
+ name: "direct access disabled",
+ UseWorkhorseClient: false,
+ remoteTempObjectID: "test-object",
+ objectStorageConfig: cfg,
+ expected: false,
+ },
+ {
+ name: "with IAM instance profile",
+ UseWorkhorseClient: true,
+ remoteTempObjectID: "test-object",
+ objectStorageConfig: iamConfig,
+ expected: true,
+ },
+ {
+ name: "missing remote temp object ID",
+ UseWorkhorseClient: true,
+ remoteTempObjectID: "",
+ objectStorageConfig: cfg,
+ expected: false,
+ },
+ {
+ name: "missing S3 config",
+ UseWorkhorseClient: true,
+ remoteTempObjectID: "test-object",
+ expected: false,
+ },
+ {
+ name: "missing S3 bucket",
+ UseWorkhorseClient: true,
+ remoteTempObjectID: "test-object",
+ objectStorageConfig: destination.ObjectStorageConfig{
+ Provider: "AWS",
+ S3Config: config.S3Config{},
+ },
+ expected: false,
+ },
+ {
+ name: "missing S3 region",
+ UseWorkhorseClient: true,
+ remoteTempObjectID: "test-object",
+ objectStorageConfig: missingRegion,
+ expected: true,
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ apiResponse := &api.Response{
+ RemoteObject: api.RemoteObject{
+ Timeout: 10,
+ ID: "id",
+ UseWorkhorseClient: test.UseWorkhorseClient,
+ RemoteTempObjectID: test.remoteTempObjectID,
+ },
+ }
+ deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
+ opts, err := destination.GetOpts(apiResponse)
+ require.NoError(t, err)
+ opts.ObjectStorageConfig = test.objectStorageConfig
+
+ require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
+ require.WithinDuration(t, deadline, opts.Deadline, time.Second)
+ require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
+ require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient)
+ require.Equal(t, test.expected, opts.UseWorkhorseClientEnabled())
+ })
+ }
+}
+
+func TestGoCloudConfig(t *testing.T) {
+ mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azblob")
+ defer cleanup()
+
+ tests := []struct {
+ name string
+ provider string
+ url string
+ valid bool
+ }{
+ {
+ name: "valid AzureRM config",
+ provider: "AzureRM",
+ url: "azblob:://test-container",
+ valid: true,
+ },
+ {
+ name: "invalid GoCloud scheme",
+ provider: "AzureRM",
+ url: "unknown:://test-container",
+ valid: true,
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ apiResponse := &api.Response{
+ RemoteObject: api.RemoteObject{
+ Timeout: 10,
+ ID: "id",
+ UseWorkhorseClient: true,
+ RemoteTempObjectID: "test-object",
+ ObjectStorage: &api.ObjectStorageParams{
+ Provider: test.provider,
+ GoCloudConfig: config.GoCloudConfig{
+ URL: test.url,
+ },
+ },
+ },
+ }
+ deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
+ opts, err := destination.GetOpts(apiResponse)
+ require.NoError(t, err)
+ opts.ObjectStorageConfig.URLMux = mux
+
+ require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
+ require.Equal(t, apiResponse.RemoteObject.RemoteTempObjectID, opts.RemoteTempObjectID)
+ require.WithinDuration(t, deadline, opts.Deadline, time.Second)
+ require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
+ require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient)
+ require.Equal(t, test.provider, opts.ObjectStorageConfig.Provider)
+ require.Equal(t, apiResponse.RemoteObject.ObjectStorage.GoCloudConfig, opts.ObjectStorageConfig.GoCloudConfig)
+ require.True(t, opts.UseWorkhorseClientEnabled())
+ require.Equal(t, test.valid, opts.ObjectStorageConfig.IsValid())
+ require.False(t, opts.IsLocal())
+ })
+ }
+}
diff --git a/workhorse/internal/upload/lfs_preparer.go b/workhorse/internal/upload/lfs_preparer.go
new file mode 100644
index 00000000000..e7c5cf16a30
--- /dev/null
+++ b/workhorse/internal/upload/lfs_preparer.go
@@ -0,0 +1,47 @@
+package upload
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
+)
+
+type object struct {
+ size int64
+ oid string
+}
+
+func (l *object) Verify(fh *destination.FileHandler) error {
+ if fh.Size != l.size {
+ return fmt.Errorf("LFSObject: expected size %d, wrote %d", l.size, fh.Size)
+ }
+
+ if fh.SHA256() != l.oid {
+ return fmt.Errorf("LFSObject: expected sha256 %s, got %s", l.oid, fh.SHA256())
+ }
+
+ return nil
+}
+
+type uploadPreparer struct {
+ objectPreparer Preparer
+}
+
+// NewLfs returns a new preparer instance which adds capability to a wrapped
+// preparer to set options required for a LFS upload.
+func NewLfsPreparer(c config.Config, objectPreparer Preparer) Preparer {
+ return &uploadPreparer{objectPreparer: objectPreparer}
+}
+
+func (l *uploadPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
+ opts, _, err := l.objectPreparer.Prepare(a)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ opts.TempFilePrefix = a.LfsOid
+
+ return opts, &object{oid: a.LfsOid, size: a.LfsSize}, nil
+}
diff --git a/workhorse/internal/upload/lfs_preparer_test.go b/workhorse/internal/upload/lfs_preparer_test.go
new file mode 100644
index 00000000000..6be4a7c2955
--- /dev/null
+++ b/workhorse/internal/upload/lfs_preparer_test.go
@@ -0,0 +1,59 @@
+package upload
+
+import (
+ "testing"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestLfsPreparerWithConfig(t *testing.T) {
+ lfsOid := "abcd1234"
+ creds := config.S3Credentials{
+ AwsAccessKeyID: "test-key",
+ AwsSecretAccessKey: "test-secret",
+ }
+
+ c := config.Config{
+ ObjectStorageCredentials: config.ObjectStorageCredentials{
+ Provider: "AWS",
+ S3Credentials: creds,
+ },
+ }
+
+ r := &api.Response{
+ LfsOid: lfsOid,
+ RemoteObject: api.RemoteObject{
+ ID: "the upload ID",
+ UseWorkhorseClient: true,
+ ObjectStorage: &api.ObjectStorageParams{
+ Provider: "AWS",
+ },
+ },
+ }
+
+ uploadPreparer := NewObjectStoragePreparer(c)
+ lfsPreparer := NewLfsPreparer(c, uploadPreparer)
+ opts, verifier, err := lfsPreparer.Prepare(r)
+
+ require.NoError(t, err)
+ require.Equal(t, lfsOid, opts.TempFilePrefix)
+ require.True(t, opts.ObjectStorageConfig.IsAWS())
+ require.True(t, opts.UseWorkhorseClient)
+ require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials)
+ require.NotNil(t, verifier)
+}
+
+func TestLfsPreparerWithNoConfig(t *testing.T) {
+ c := config.Config{}
+ r := &api.Response{RemoteObject: api.RemoteObject{ID: "the upload ID"}}
+ uploadPreparer := NewObjectStoragePreparer(c)
+ lfsPreparer := NewLfsPreparer(c, uploadPreparer)
+ opts, verifier, err := lfsPreparer.Prepare(r)
+
+ require.NoError(t, err)
+ require.False(t, opts.UseWorkhorseClient)
+ require.NotNil(t, verifier)
+}
diff --git a/workhorse/internal/upload/accelerate.go b/workhorse/internal/upload/multipart_uploader.go
index 28d3b3dee2e..d0097f9e153 100644
--- a/workhorse/internal/upload/accelerate.go
+++ b/workhorse/internal/upload/multipart_uploader.go
@@ -4,19 +4,10 @@ import (
"fmt"
"net/http"
- "github.com/golang-jwt/jwt/v4"
-
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
)
-const RewrittenFieldsHeader = "Gitlab-Workhorse-Multipart-Fields"
-
-type MultipartClaims struct {
- RewrittenFields map[string]string `json:"rewritten_fields"`
- jwt.StandardClaims
-}
-
// Multipart is a request middleware. If the request has a MIME multipart
// request body, the middleware will iterate through the multipart parts.
// When it finds a file part (filename != ""), the middleware will save
@@ -32,6 +23,6 @@ func Multipart(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return
}
- InterceptMultipartFiles(w, r, h, a, s, opts)
+ interceptMultipartFiles(w, r, h, a, s, opts)
}, "/authorize")
}
diff --git a/workhorse/internal/upload/object_storage_preparer.go b/workhorse/internal/upload/object_storage_preparer.go
index 241cfc2d9d2..f28f598c895 100644
--- a/workhorse/internal/upload/object_storage_preparer.go
+++ b/workhorse/internal/upload/object_storage_preparer.go
@@ -3,7 +3,7 @@ package upload
import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
type ObjectStoragePreparer struct {
@@ -11,12 +11,15 @@ type ObjectStoragePreparer struct {
credentials config.ObjectStorageCredentials
}
+// NewObjectStoragePreparer returns a new preparer instance which is responsible for
+// setting the object storage credentials and settings needed by an uploader
+// to upload to object storage.
func NewObjectStoragePreparer(c config.Config) Preparer {
return &ObjectStoragePreparer{credentials: c.ObjectStorageCredentials, config: c.ObjectStorageConfig}
}
-func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
- opts, err := filestore.GetOpts(a)
+func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
+ opts, err := destination.GetOpts(a)
if err != nil {
return nil, nil, err
}
diff --git a/workhorse/internal/upload/preparer.go b/workhorse/internal/upload/preparer.go
new file mode 100644
index 00000000000..46a4cac01b5
--- /dev/null
+++ b/workhorse/internal/upload/preparer.go
@@ -0,0 +1,33 @@
+package upload
+
+import (
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
+)
+
+// Verifier is an optional pluggable behavior for upload paths. If
+// Verify() returns an error, Workhorse will return an error response to
+// the client instead of propagating the request to Rails. The motivating
+// use case is Git LFS, where Workhorse checks the size and SHA256
+// checksum of the uploaded file.
+type Verifier interface {
+ // Verify can abort the upload by returning an error
+ Verify(handler *destination.FileHandler) error
+}
+
+// Preparer is a pluggable behavior that interprets a Rails API response
+// and either tells Workhorse how to handle the upload, via the
+// UploadOpts and Verifier, or it rejects the request by returning a
+// non-nil error. Its intended use is to make sure the upload gets stored
+// in the right location: either a local directory, or one of several
+// supported object storage backends.
+type Preparer interface {
+ Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error)
+}
+
+type DefaultPreparer struct{}
+
+func (s *DefaultPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) {
+ opts, err := destination.GetOpts(a)
+ return opts, nil, err
+}
diff --git a/workhorse/internal/upload/rewrite.go b/workhorse/internal/upload/rewrite.go
index bbabe840ef5..ff5190226af 100644
--- a/workhorse/internal/upload/rewrite.go
+++ b/workhorse/internal/upload/rewrite.go
@@ -21,8 +21,8 @@ import (
"golang.org/x/image/tiff"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/lsif_transformer/parser"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif"
)
@@ -68,7 +68,7 @@ type rewriter struct {
finalizedFields map[string]bool
}
-func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) error {
+func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) error {
// Create multipart reader
reader, err := r.MultipartReader()
if err != nil {
@@ -128,7 +128,7 @@ func parseAndNormalizeContentDisposition(header textproto.MIMEHeader) (string, s
return params["name"], params["filename"]
}
-func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *filestore.SaveFileOpts) error {
+func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *destination.UploadOpts) error {
if rew.filter.Count() >= maxFilesAllowed {
return ErrTooManyFilesUploaded
}
@@ -164,10 +164,10 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
defer inputReader.Close()
- fh, err := filestore.SaveFileFromReader(ctx, inputReader, -1, opts)
+ fh, err := destination.Upload(ctx, inputReader, -1, opts)
if err != nil {
switch err {
- case filestore.ErrEntityTooLarge, exif.ErrRemovingExif:
+ case destination.ErrEntityTooLarge, exif.ErrRemovingExif:
return err
default:
return fmt.Errorf("persisting multipart file: %v", err)
diff --git a/workhorse/internal/upload/saved_file_tracker.go b/workhorse/internal/upload/saved_file_tracker.go
index e6f9a8c9a88..b70a303a4a4 100644
--- a/workhorse/internal/upload/saved_file_tracker.go
+++ b/workhorse/internal/upload/saved_file_tracker.go
@@ -6,8 +6,8 @@ import (
"mime/multipart"
"net/http"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
type SavedFileTracker struct {
@@ -26,7 +26,7 @@ func (s *SavedFileTracker) Count() int {
return len(s.rewrittenFields)
}
-func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *filestore.FileHandler, _ *multipart.Writer) error {
+func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *destination.FileHandler, _ *multipart.Writer) error {
if _, ok := s.rewrittenFields[fieldName]; ok {
return fmt.Errorf("the %v field has already been processed", fieldName)
}
diff --git a/workhorse/internal/upload/saved_file_tracker_test.go b/workhorse/internal/upload/saved_file_tracker_test.go
index ba927db253e..4f323bf8888 100644
--- a/workhorse/internal/upload/saved_file_tracker_test.go
+++ b/workhorse/internal/upload/saved_file_tracker_test.go
@@ -10,8 +10,8 @@ import (
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
)
func TestSavedFileTracking(t *testing.T) {
@@ -23,7 +23,7 @@ func TestSavedFileTracking(t *testing.T) {
tracker := SavedFileTracker{Request: r}
require.Equal(t, "accelerate", tracker.Name())
- file := &filestore.FileHandler{}
+ file := &destination.FileHandler{}
ctx := context.Background()
tracker.ProcessFile(ctx, "test", file, nil)
require.Equal(t, 1, tracker.Count())
@@ -40,7 +40,7 @@ func TestSavedFileTracking(t *testing.T) {
func TestDuplicatedFileProcessing(t *testing.T) {
tracker := SavedFileTracker{}
- file := &filestore.FileHandler{}
+ file := &destination.FileHandler{}
require.NoError(t, tracker.ProcessFile(context.Background(), "file", file, nil))
diff --git a/workhorse/internal/upload/uploads.go b/workhorse/internal/upload/uploads.go
index 1806e7563ce..8272a3d920d 100644
--- a/workhorse/internal/upload/uploads.go
+++ b/workhorse/internal/upload/uploads.go
@@ -8,27 +8,39 @@ import (
"mime/multipart"
"net/http"
+ "github.com/golang-jwt/jwt/v4"
+
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts"
)
+const RewrittenFieldsHeader = "Gitlab-Workhorse-Multipart-Fields"
+
+type PreAuthorizer interface {
+ PreAuthorizeHandler(next api.HandleFunc, suffix string) http.Handler
+}
+
+type MultipartClaims struct {
+ RewrittenFields map[string]string `json:"rewritten_fields"`
+ jwt.StandardClaims
+}
+
// MultipartFormProcessor abstracts away implementation differences
// between generic MIME multipart file uploads and CI artifact uploads.
type MultipartFormProcessor interface {
- ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error
+ ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer) error
ProcessField(ctx context.Context, formName string, writer *multipart.Writer) error
Finalize(ctx context.Context) error
Name() string
Count() int
}
-// InterceptMultipartFiles is the core of the implementation of
-// Multipart. Because it is also used for CI artifact uploads it is a
-// public function.
-func InterceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) {
+// interceptMultipartFiles is the core of the implementation of
+// Multipart.
+func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) {
var body bytes.Buffer
writer := multipart.NewWriter(&body)
defer writer.Close()
@@ -43,7 +55,7 @@ func InterceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Hand
helper.CaptureAndFail(w, r, err, err.Error(), http.StatusBadRequest)
case http.ErrNotMultipart:
h.ServeHTTP(w, r)
- case filestore.ErrEntityTooLarge:
+ case destination.ErrEntityTooLarge:
helper.RequestEntityTooLarge(w, r, err)
case zipartifacts.ErrBadMetadata:
helper.RequestEntityTooLarge(w, r, err)
diff --git a/workhorse/internal/upload/uploads_test.go b/workhorse/internal/upload/uploads_test.go
index f262bf94b08..9d787b10d1c 100644
--- a/workhorse/internal/upload/uploads_test.go
+++ b/workhorse/internal/upload/uploads_test.go
@@ -22,9 +22,9 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
)
@@ -78,7 +78,7 @@ func TestUploadHandlerForwardingRawData(t *testing.T) {
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, handler, apiResponse, nil, opts)
+ interceptMultipartFiles(response, httpRequest, handler, apiResponse, nil, opts)
require.Equal(t, 202, response.Code)
require.Equal(t, "RESPONSE", response.Body.String(), "response body")
@@ -149,7 +149,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
+ interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
require.Equal(t, 202, response.Code)
cancel() // this will trigger an async cleanup
@@ -218,7 +218,7 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) {
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
+ interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
require.Equal(t, test.response, response.Code)
cancel() // this will trigger an async cleanup
@@ -248,7 +248,7 @@ func TestUploadProcessingField(t *testing.T) {
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
+ interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
require.Equal(t, 500, response.Code)
}
@@ -279,7 +279,7 @@ func TestUploadingMultipleFiles(t *testing.T) {
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
+ interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
require.Equal(t, 400, response.Code)
require.Equal(t, "upload request contains more than 10 files\n", response.Body.String())
@@ -335,7 +335,7 @@ func TestUploadProcessingFile(t *testing.T) {
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
+ interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts)
require.Equal(t, 200, response.Code)
})
@@ -381,7 +381,7 @@ func TestInvalidFileNames(t *testing.T) {
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
+ interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
require.Equal(t, testCase.code, response.Code)
require.Equal(t, testCase.expectedPrefix, opts.TempFilePrefix)
}
@@ -447,7 +447,7 @@ func TestContentDispositionRewrite(t *testing.T) {
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, customHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
+ interceptMultipartFiles(response, httpRequest, customHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
upstreamRequest, err := http.ReadRequest(bufio.NewReader(&upstreamRequestBuffer))
require.NoError(t, err)
@@ -570,7 +570,7 @@ func runUploadTest(t *testing.T, image []byte, filename string, httpCode int, ts
opts, _, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
- InterceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
+ interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts)
require.Equal(t, httpCode, response.Code)
}