diff options
Diffstat (limited to 'workhorse/internal/upload')
44 files changed, 4450 insertions, 78 deletions
diff --git a/workhorse/internal/upload/artifacts_store_test.go b/workhorse/internal/upload/artifacts_store_test.go new file mode 100644 index 00000000000..97e66fc37a4 --- /dev/null +++ b/workhorse/internal/upload/artifacts_store_test.go @@ -0,0 +1,335 @@ +package upload + +import ( + "archive/zip" + "bytes" + "crypto/md5" + "encoding/hex" + "fmt" + "io/ioutil" + "mime/multipart" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" +) + +func createTestZipArchive(t *testing.T) (data []byte, md5Hash string) { + var buffer bytes.Buffer + archive := zip.NewWriter(&buffer) + fileInArchive, err := archive.Create("test.file") + require.NoError(t, err) + fmt.Fprint(fileInArchive, "test") + archive.Close() + data = buffer.Bytes() + + hasher := md5.New() + hasher.Write(data) + hexHash := hasher.Sum(nil) + md5Hash = hex.EncodeToString(hexHash) + + return data, md5Hash +} + +func createTestMultipartForm(t *testing.T, data []byte) (bytes.Buffer, string) { + var buffer bytes.Buffer + writer := multipart.NewWriter(&buffer) + file, err := writer.CreateFormFile("file", "my.file") + require.NoError(t, err) + file.Write(data) + writer.Close() + return buffer, writer.FormDataContentType() +} + +func testUploadArtifactsFromTestZip(t *testing.T, ts *httptest.Server) *httptest.ResponseRecorder { + archiveData, _ := createTestZipArchive(t) + contentBuffer, contentType := createTestMultipartForm(t, archiveData) + + return testUploadArtifacts(t, contentType, ts.URL+Path, &contentBuffer) +} + +func TestUploadHandlerSendingToExternalStorage(t *testing.T) { + tempPath, err := ioutil.TempDir("", "uploads") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempPath) + + archiveData, md5 := createTestZipArchive(t) + archiveFile, err := ioutil.TempFile("", "artifact.zip") + require.NoError(t, err) + defer os.Remove(archiveFile.Name()) + _, err = archiveFile.Write(archiveData) + require.NoError(t, err) + archiveFile.Close() + + storeServerCalled := 0 + storeServerMux := http.NewServeMux() + storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "PUT", r.Method) + + receivedData, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + require.Equal(t, archiveData, receivedData) + + storeServerCalled++ + w.Header().Set("ETag", md5) + w.WriteHeader(200) + }) + storeServerMux.HandleFunc("/store-id", func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, archiveFile.Name()) + }) + + responseProcessorCalled := 0 + responseProcessor := func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "store-id", r.FormValue("file.remote_id")) + require.NotEmpty(t, r.FormValue("file.remote_url")) + w.WriteHeader(200) + responseProcessorCalled++ + } + + storeServer := httptest.NewServer(storeServerMux) + defer storeServer.Close() + + qs := fmt.Sprintf("?%s=%s", ArtifactFormatKey, ArtifactFormatZip) + + tests := []struct { + name string + preauth *api.Response + }{ + { + name: "ObjectStore Upload", + preauth: &api.Response{ + RemoteObject: api.RemoteObject{ + StoreURL: storeServer.URL + "/url/put" + qs, + ID: "store-id", + GetURL: storeServer.URL + "/store-id", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + storeServerCalled = 0 + responseProcessorCalled = 0 + + ts := testArtifactsUploadServer(t, test.preauth, responseProcessor) + defer ts.Close() + + contentBuffer, contentType := createTestMultipartForm(t, archiveData) + response := testUploadArtifacts(t, contentType, ts.URL+Path+qs, &contentBuffer) + require.Equal(t, http.StatusOK, response.Code) + testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent) + require.Equal(t, 1, storeServerCalled, "store should be called only once") + require.Equal(t, 1, responseProcessorCalled, "response processor should be called only once") + }) + } +} + +func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *testing.T) { + tempPath, err := ioutil.TempDir("", "uploads") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempPath) + + responseProcessor := func(w http.ResponseWriter, r *http.Request) { + t.Fatal("it should not be called") + } + + authResponse := &api.Response{ + TempPath: tempPath, + RemoteObject: api.RemoteObject{ + StoreURL: "http://localhost:12323/invalid/url", + ID: "store-id", + }, + } + + ts := testArtifactsUploadServer(t, authResponse, responseProcessor) + defer ts.Close() + + response := testUploadArtifactsFromTestZip(t, ts) + require.Equal(t, http.StatusInternalServerError, response.Code) +} + +func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T) { + tempPath, err := ioutil.TempDir("", "uploads") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempPath) + + responseProcessor := func(w http.ResponseWriter, r *http.Request) { + t.Fatal("it should not be called") + } + + authResponse := &api.Response{ + TempPath: tempPath, + RemoteObject: api.RemoteObject{ + StoreURL: "htt:////invalid-url", + ID: "store-id", + }, + } + + ts := testArtifactsUploadServer(t, authResponse, responseProcessor) + defer ts.Close() + + response := testUploadArtifactsFromTestZip(t, ts) + require.Equal(t, http.StatusInternalServerError, response.Code) +} + +func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T) { + putCalledTimes := 0 + + storeServerMux := http.NewServeMux() + storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) { + putCalledTimes++ + require.Equal(t, "PUT", r.Method) + w.WriteHeader(510) + }) + + responseProcessor := func(w http.ResponseWriter, r *http.Request) { + t.Fatal("it should not be called") + } + + storeServer := httptest.NewServer(storeServerMux) + defer storeServer.Close() + + authResponse := &api.Response{ + RemoteObject: api.RemoteObject{ + StoreURL: storeServer.URL + "/url/put", + ID: "store-id", + }, + } + + ts := testArtifactsUploadServer(t, authResponse, responseProcessor) + defer ts.Close() + + response := testUploadArtifactsFromTestZip(t, ts) + require.Equal(t, http.StatusInternalServerError, response.Code) + require.Equal(t, 1, putCalledTimes, "upload should be called only once") +} + +func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testing.T) { + putCalledTimes := 0 + + storeServerMux := http.NewServeMux() + storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) { + putCalledTimes++ + require.Equal(t, "PUT", r.Method) + time.Sleep(10 * time.Second) + w.WriteHeader(510) + }) + + responseProcessor := func(w http.ResponseWriter, r *http.Request) { + t.Fatal("it should not be called") + } + + storeServer := httptest.NewServer(storeServerMux) + defer storeServer.Close() + + authResponse := &api.Response{ + RemoteObject: api.RemoteObject{ + StoreURL: storeServer.URL + "/url/put", + ID: "store-id", + Timeout: 1, + }, + } + + ts := testArtifactsUploadServer(t, authResponse, responseProcessor) + defer ts.Close() + + response := testUploadArtifactsFromTestZip(t, ts) + require.Equal(t, http.StatusInternalServerError, response.Code) + require.Equal(t, 1, putCalledTimes, "upload should be called only once") +} + +func TestUploadHandlerMultipartUploadSizeLimit(t *testing.T) { + os, server := test.StartObjectStore() + defer server.Close() + + err := os.InitiateMultipartUpload(test.ObjectPath) + require.NoError(t, err) + + objectURL := server.URL + test.ObjectPath + + uploadSize := 10 + preauth := &api.Response{ + RemoteObject: api.RemoteObject{ + ID: "store-id", + MultipartUpload: &api.MultipartUploadParams{ + PartSize: 1, + PartURLs: []string{objectURL + "?partNumber=1"}, + AbortURL: objectURL, // DELETE + CompleteURL: objectURL, // POST + }, + }, + } + + responseProcessor := func(w http.ResponseWriter, r *http.Request) { + t.Fatal("it should not be called") + } + + ts := testArtifactsUploadServer(t, preauth, responseProcessor) + defer ts.Close() + + contentBuffer, contentType := createTestMultipartForm(t, make([]byte, uploadSize)) + response := testUploadArtifacts(t, contentType, ts.URL+Path, &contentBuffer) + require.Equal(t, http.StatusRequestEntityTooLarge, response.Code) + require.Eventually(t, func() bool { + return !os.IsMultipartUpload(test.ObjectPath) + }, time.Second, time.Millisecond, "MultipartUpload should not be in progress anymore") + require.Empty(t, os.GetObjectMD5(test.ObjectPath), "upload should have failed, so the object should not exists") +} + +func TestUploadHandlerMultipartUploadMaximumSizeFromApi(t *testing.T) { + os, server := test.StartObjectStore() + defer server.Close() + + err := os.InitiateMultipartUpload(test.ObjectPath) + require.NoError(t, err) + + objectURL := server.URL + test.ObjectPath + + uploadSize := int64(10) + maxSize := uploadSize - 1 + preauth := &api.Response{ + MaximumSize: maxSize, + RemoteObject: api.RemoteObject{ + ID: "store-id", + MultipartUpload: &api.MultipartUploadParams{ + PartSize: uploadSize, + PartURLs: []string{objectURL + "?partNumber=1"}, + AbortURL: objectURL, // DELETE + CompleteURL: objectURL, // POST + }, + }, + } + + responseProcessor := func(w http.ResponseWriter, r *http.Request) { + t.Fatal("it should not be called") + } + + ts := testArtifactsUploadServer(t, preauth, responseProcessor) + defer ts.Close() + + contentBuffer, contentType := createTestMultipartForm(t, make([]byte, uploadSize)) + response := testUploadArtifacts(t, contentType, ts.URL+Path, &contentBuffer) + require.Equal(t, http.StatusRequestEntityTooLarge, response.Code) + + testhelper.Retry(t, 5*time.Second, func() error { + if os.GetObjectMD5(test.ObjectPath) == "" { + return nil + } + + return fmt.Errorf("file is still present") + }) +} diff --git a/workhorse/internal/upload/artifacts_upload_test.go b/workhorse/internal/upload/artifacts_upload_test.go new file mode 100644 index 00000000000..0a9e4ef3869 --- /dev/null +++ b/workhorse/internal/upload/artifacts_upload_test.go @@ -0,0 +1,331 @@ +package upload + +import ( + "archive/zip" + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/golang-jwt/jwt/v4" + + "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts" + + "github.com/stretchr/testify/require" +) + +const ( + MetadataHeaderKey = "Metadata-Status" + MetadataHeaderPresent = "present" + MetadataHeaderMissing = "missing" + Path = "/url/path" +) + +func TestMain(m *testing.M) { + if err := testhelper.BuildExecutables(); err != nil { + log.WithError(err).Fatal() + } + + os.Exit(m.Run()) +} + +func testArtifactsUploadServer(t *testing.T, authResponse *api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server { + mux := http.NewServeMux() + mux.HandleFunc(Path+"/authorize", func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Fatal("Expected POST request") + } + + w.Header().Set("Content-Type", api.ResponseContentType) + + data, err := json.Marshal(&authResponse) + if err != nil { + t.Fatal("Expected to marshal") + } + w.Write(data) + }) + mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) { + opts, err := destination.GetOpts(authResponse) + require.NoError(t, err) + + if r.Method != "POST" { + t.Fatal("Expected POST request") + } + if opts.IsLocal() { + if r.FormValue("file.path") == "" { + t.Fatal("Expected file to be present") + return + } + + _, err := ioutil.ReadFile(r.FormValue("file.path")) + if err != nil { + t.Fatal("Expected file to be readable") + return + } + } else { + if r.FormValue("file.remote_url") == "" { + t.Fatal("Expected file to be remote accessible") + return + } + } + + if r.FormValue("metadata.path") != "" { + metadata, err := ioutil.ReadFile(r.FormValue("metadata.path")) + if err != nil { + t.Fatal("Expected metadata to be readable") + return + } + gz, err := gzip.NewReader(bytes.NewReader(metadata)) + if err != nil { + t.Fatal("Expected metadata to be valid gzip") + return + } + defer gz.Close() + metadata, err = ioutil.ReadAll(gz) + if err != nil { + t.Fatal("Expected metadata to be valid") + return + } + if !bytes.HasPrefix(metadata, []byte(zipartifacts.MetadataHeaderPrefix+zipartifacts.MetadataHeader)) { + t.Fatal("Expected metadata to be of valid format") + return + } + + w.Header().Set(MetadataHeaderKey, MetadataHeaderPresent) + + } else { + w.Header().Set(MetadataHeaderKey, MetadataHeaderMissing) + } + + w.WriteHeader(http.StatusOK) + + if bodyProcessor != nil { + bodyProcessor(w, r) + } + }) + return testhelper.TestServerWithHandler(nil, mux.ServeHTTP) +} + +type testServer struct { + url string + writer *multipart.Writer + buffer *bytes.Buffer + fileWriter io.Writer + cleanup func() +} + +func setupWithTmpPath(t *testing.T, filename string, includeFormat bool, format string, authResponse *api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *testServer { + tempPath, err := ioutil.TempDir("", "uploads") + require.NoError(t, err) + + if authResponse == nil { + authResponse = &api.Response{TempPath: tempPath} + } + + ts := testArtifactsUploadServer(t, authResponse, bodyProcessor) + + var buffer bytes.Buffer + writer := multipart.NewWriter(&buffer) + fileWriter, err := writer.CreateFormFile(filename, "my.file") + require.NotNil(t, fileWriter) + require.NoError(t, err) + + cleanup := func() { + ts.Close() + require.NoError(t, os.RemoveAll(tempPath)) + require.NoError(t, writer.Close()) + } + + qs := "" + + if includeFormat { + qs = fmt.Sprintf("?%s=%s", ArtifactFormatKey, format) + } + + return &testServer{url: ts.URL + Path + qs, writer: writer, buffer: &buffer, fileWriter: fileWriter, cleanup: cleanup} +} + +func testUploadArtifacts(t *testing.T, contentType, url string, body io.Reader) *httptest.ResponseRecorder { + httpRequest, err := http.NewRequest("POST", url, body) + require.NoError(t, err) + + httpRequest.Header.Set("Content-Type", contentType) + response := httptest.NewRecorder() + parsedURL := helper.URLMustParse(url) + roundTripper := roundtripper.NewTestBackendRoundTripper(parsedURL) + testhelper.ConfigureSecret() + apiClient := api.NewAPI(parsedURL, "123", roundTripper) + proxyClient := proxy.NewProxy(parsedURL, "123", roundTripper) + Artifacts(apiClient, proxyClient, &DefaultPreparer{}).ServeHTTP(response, httpRequest) + return response +} + +func TestUploadHandlerAddingMetadata(t *testing.T) { + testCases := []struct { + desc string + format string + includeFormat bool + }{ + { + desc: "ZIP format", + format: ArtifactFormatZip, + includeFormat: true, + }, + { + desc: "default format", + format: ArtifactFormatDefault, + includeFormat: true, + }, + { + desc: "default format without artifact_format", + format: ArtifactFormatDefault, + includeFormat: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + s := setupWithTmpPath(t, "file", tc.includeFormat, tc.format, nil, + func(w http.ResponseWriter, r *http.Request) { + token, err := jwt.ParseWithClaims(r.Header.Get(RewrittenFieldsHeader), &MultipartClaims{}, testhelper.ParseJWT) + require.NoError(t, err) + + rewrittenFields := token.Claims.(*MultipartClaims).RewrittenFields + require.Equal(t, 2, len(rewrittenFields)) + + require.Contains(t, rewrittenFields, "file") + require.Contains(t, rewrittenFields, "metadata") + require.Contains(t, r.PostForm, "file.gitlab-workhorse-upload") + require.Contains(t, r.PostForm, "metadata.gitlab-workhorse-upload") + }, + ) + defer s.cleanup() + + archive := zip.NewWriter(s.fileWriter) + file, err := archive.Create("test.file") + require.NotNil(t, file) + require.NoError(t, err) + + require.NoError(t, archive.Close()) + require.NoError(t, s.writer.Close()) + + response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) + require.Equal(t, http.StatusOK, response.Code) + testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent) + }) + } +} + +func TestUploadHandlerTarArtifact(t *testing.T) { + s := setupWithTmpPath(t, "file", true, "tar", nil, + func(w http.ResponseWriter, r *http.Request) { + token, err := jwt.ParseWithClaims(r.Header.Get(RewrittenFieldsHeader), &MultipartClaims{}, testhelper.ParseJWT) + require.NoError(t, err) + + rewrittenFields := token.Claims.(*MultipartClaims).RewrittenFields + require.Equal(t, 1, len(rewrittenFields)) + + require.Contains(t, rewrittenFields, "file") + require.Contains(t, r.PostForm, "file.gitlab-workhorse-upload") + }, + ) + defer s.cleanup() + + file, err := os.Open("../../testdata/tarfile.tar") + require.NoError(t, err) + + _, err = io.Copy(s.fileWriter, file) + require.NoError(t, err) + require.NoError(t, file.Close()) + require.NoError(t, s.writer.Close()) + + response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) + require.Equal(t, http.StatusOK, response.Code) + testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderMissing) +} + +func TestUploadHandlerForUnsupportedArchive(t *testing.T) { + s := setupWithTmpPath(t, "file", true, "other", nil, nil) + defer s.cleanup() + require.NoError(t, s.writer.Close()) + + response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) + require.Equal(t, http.StatusOK, response.Code) + testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderMissing) +} + +func TestUploadHandlerForMultipleFiles(t *testing.T) { + s := setupWithTmpPath(t, "file", true, "", nil, nil) + defer s.cleanup() + + file, err := s.writer.CreateFormFile("file", "my.file") + require.NotNil(t, file) + require.NoError(t, err) + require.NoError(t, s.writer.Close()) + + response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) + require.Equal(t, http.StatusInternalServerError, response.Code) +} + +func TestUploadFormProcessing(t *testing.T) { + s := setupWithTmpPath(t, "metadata", true, "", nil, nil) + defer s.cleanup() + require.NoError(t, s.writer.Close()) + + response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) + require.Equal(t, http.StatusInternalServerError, response.Code) +} + +func TestLsifFileProcessing(t *testing.T) { + tempPath, err := ioutil.TempDir("", "uploads") + require.NoError(t, err) + + s := setupWithTmpPath(t, "file", true, "zip", &api.Response{TempPath: tempPath, ProcessLsif: true}, nil) + defer s.cleanup() + + file, err := os.Open("../../testdata/lsif/valid.lsif.zip") + require.NoError(t, err) + + _, err = io.Copy(s.fileWriter, file) + require.NoError(t, err) + require.NoError(t, file.Close()) + require.NoError(t, s.writer.Close()) + + response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) + require.Equal(t, http.StatusOK, response.Code) + testhelper.RequireResponseHeader(t, response, MetadataHeaderKey, MetadataHeaderPresent) +} + +func TestInvalidLsifFileProcessing(t *testing.T) { + tempPath, err := ioutil.TempDir("", "uploads") + require.NoError(t, err) + + s := setupWithTmpPath(t, "file", true, "zip", &api.Response{TempPath: tempPath, ProcessLsif: true}, nil) + defer s.cleanup() + + file, err := os.Open("../../testdata/lsif/invalid.lsif.zip") + require.NoError(t, err) + + _, err = io.Copy(s.fileWriter, file) + require.NoError(t, err) + require.NoError(t, file.Close()) + require.NoError(t, s.writer.Close()) + + response := testUploadArtifacts(t, s.writer.FormDataContentType(), s.url, s.buffer) + require.Equal(t, http.StatusInternalServerError, response.Code) +} diff --git a/workhorse/internal/upload/artifacts_uploader.go b/workhorse/internal/upload/artifacts_uploader.go new file mode 100644 index 00000000000..2a91a05fe3d --- /dev/null +++ b/workhorse/internal/upload/artifacts_uploader.go @@ -0,0 +1,167 @@ +package upload + +import ( + "context" + "fmt" + "io" + "mime/multipart" + "net/http" + "os" + "os/exec" + "strings" + "syscall" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts" +) + +// Sent by the runner: https://gitlab.com/gitlab-org/gitlab-runner/blob/c24da19ecce8808d9d2950896f70c94f5ea1cc2e/network/gitlab.go#L580 +const ( + ArtifactFormatKey = "artifact_format" + ArtifactFormatZip = "zip" + ArtifactFormatDefault = "" +) + +var zipSubcommandsErrorsCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_zip_subcommand_errors_total", + Help: "Errors comming from subcommands used for processing ZIP archives", + }, []string{"error"}) + +type artifactsUploadProcessor struct { + opts *destination.UploadOpts + format string + + SavedFileTracker +} + +// Artifacts is like a Multipart but specific for artifacts upload. +func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler { + return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { + opts, _, err := p.Prepare(a) + if err != nil { + helper.Fail500(w, r, fmt.Errorf("UploadArtifacts: error preparing file storage options")) + return + } + + format := r.URL.Query().Get(ArtifactFormatKey) + + mg := &artifactsUploadProcessor{opts: opts, format: format, SavedFileTracker: SavedFileTracker{Request: r}} + interceptMultipartFiles(w, r, h, a, mg, opts) + }, "/authorize") +} + +func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *destination.FileHandler) (*destination.FileHandler, error) { + metaReader, metaWriter := io.Pipe() + defer metaWriter.Close() + + metaOpts := &destination.UploadOpts{ + LocalTempPath: a.opts.LocalTempPath, + TempFilePrefix: "metadata.gz", + } + if metaOpts.LocalTempPath == "" { + metaOpts.LocalTempPath = os.TempDir() + } + + fileName := file.LocalPath + if fileName == "" { + fileName = file.RemoteURL + } + + zipMd := exec.CommandContext(ctx, "gitlab-zip-metadata", fileName) + zipMd.Stderr = log.ContextLogger(ctx).Writer() + zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + zipMd.Stdout = metaWriter + + if err := zipMd.Start(); err != nil { + return nil, err + } + defer helper.CleanUpProcessGroup(zipMd) + + type saveResult struct { + error + *destination.FileHandler + } + done := make(chan saveResult) + go func() { + var result saveResult + result.FileHandler, result.error = destination.Upload(ctx, metaReader, -1, metaOpts) + + done <- result + }() + + if err := zipMd.Wait(); err != nil { + st, ok := helper.ExitStatus(err) + + if !ok { + return nil, err + } + + zipSubcommandsErrorsCounter.WithLabelValues(zipartifacts.ErrorLabelByCode(st)).Inc() + + if st == zipartifacts.CodeNotZip { + return nil, nil + } + + if st == zipartifacts.CodeLimitsReached { + return nil, zipartifacts.ErrBadMetadata + } + } + + metaWriter.Close() + result := <-done + return result.FileHandler, result.error +} + +func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer) error { + // ProcessFile for artifacts requires file form-data field name to eq `file` + + if formName != "file" { + return fmt.Errorf("invalid form field: %q", formName) + } + if a.Count() > 0 { + return fmt.Errorf("artifacts request contains more than one file") + } + a.Track(formName, file.LocalPath) + + select { + case <-ctx.Done(): + return fmt.Errorf("ProcessFile: context done") + default: + } + + if !strings.EqualFold(a.format, ArtifactFormatZip) && a.format != ArtifactFormatDefault { + return nil + } + + // TODO: can we rely on disk for shipping metadata? Not if we split workhorse and rails in 2 different PODs + metadata, err := a.generateMetadataFromZip(ctx, file) + if err != nil { + return err + } + + if metadata != nil { + fields, err := metadata.GitLabFinalizeFields("metadata") + if err != nil { + return fmt.Errorf("finalize metadata field error: %v", err) + } + + for k, v := range fields { + writer.WriteField(k, v) + } + + a.Track("metadata", metadata.LocalPath) + } + + return nil +} + +func (a *artifactsUploadProcessor) Name() string { + return "artifacts" +} diff --git a/workhorse/internal/upload/body_uploader.go b/workhorse/internal/upload/body_uploader.go index 6c53bd9241b..d831f9f43a1 100644 --- a/workhorse/internal/upload/body_uploader.go +++ b/workhorse/internal/upload/body_uploader.go @@ -8,41 +8,10 @@ import ( "strings" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" ) -type PreAuthorizer interface { - PreAuthorizeHandler(next api.HandleFunc, suffix string) http.Handler -} - -// Verifier is an optional pluggable behavior for upload paths. If -// Verify() returns an error, Workhorse will return an error response to -// the client instead of propagating the request to Rails. The motivating -// use case is Git LFS, where Workhorse checks the size and SHA256 -// checksum of the uploaded file. -type Verifier interface { - // Verify can abort the upload by returning an error - Verify(handler *filestore.FileHandler) error -} - -// Preparer is a pluggable behavior that interprets a Rails API response -// and either tells Workhorse how to handle the upload, via the -// SaveFileOpts and Verifier, or it rejects the request by returning a -// non-nil error. Its intended use is to make sure the upload gets stored -// in the right location: either a local directory, or one of several -// supported object storage backends. -type Preparer interface { - Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) -} - -type DefaultPreparer struct{} - -func (s *DefaultPreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) { - opts, err := filestore.GetOpts(a) - return opts, nil, err -} - // RequestBody is a request middleware. It will store the request body to // a location by determined an api.Response value. It then forwards the // request to gitlab-rails without the original request body. @@ -54,7 +23,7 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler { return } - fh, err := filestore.SaveFileFromReader(r.Context(), r.Body, r.ContentLength, opts) + fh, err := destination.Upload(r.Context(), r.Body, r.ContentLength, opts) if err != nil { helper.Fail500(w, r, fmt.Errorf("RequestBody: upload failed: %v", err)) return diff --git a/workhorse/internal/upload/body_uploader_test.go b/workhorse/internal/upload/body_uploader_test.go index b3d561ac131..47490db8780 100644 --- a/workhorse/internal/upload/body_uploader_test.go +++ b/workhorse/internal/upload/body_uploader_test.go @@ -15,8 +15,8 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" ) const ( @@ -169,8 +169,8 @@ type alwaysLocalPreparer struct { prepareError error } -func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts, Verifier, error) { - opts, err := filestore.GetOpts(&api.Response{TempPath: os.TempDir()}) +func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*destination.UploadOpts, Verifier, error) { + opts, err := destination.GetOpts(&api.Response{TempPath: os.TempDir()}) if err != nil { return nil, nil, err } @@ -180,7 +180,7 @@ func (a *alwaysLocalPreparer) Prepare(_ *api.Response) (*filestore.SaveFileOpts, type alwaysFailsVerifier struct{} -func (alwaysFailsVerifier) Verify(handler *filestore.FileHandler) error { +func (alwaysFailsVerifier) Verify(handler *destination.FileHandler) error { return fmt.Errorf("Verification failed") } @@ -188,7 +188,7 @@ type mockVerifier struct { invoked bool } -func (m *mockVerifier) Verify(handler *filestore.FileHandler) error { +func (m *mockVerifier) Verify(handler *destination.FileHandler) error { m.invoked = true return nil diff --git a/workhorse/internal/upload/destination/destination.go b/workhorse/internal/upload/destination/destination.go new file mode 100644 index 00000000000..7a030e59a64 --- /dev/null +++ b/workhorse/internal/upload/destination/destination.go @@ -0,0 +1,236 @@ +// The destination package handles uploading to a specific destination (delegates +// to filestore or objectstore packages) based on options from the pre-authorization +// API and finalizing the upload. +package destination + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "strconv" + "time" + + "github.com/golang-jwt/jwt/v4" + + "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/secret" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/filestore" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" +) + +type SizeError error + +// ErrEntityTooLarge means that the uploaded content is bigger then maximum allowed size +var ErrEntityTooLarge = errors.New("entity is too large") + +// FileHandler represent a file that has been processed for upload +// it may be either uploaded to an ObjectStore and/or saved on local path. +type FileHandler struct { + // LocalPath is the path on the disk where file has been stored + LocalPath string + + // RemoteID is the objectID provided by GitLab Rails + RemoteID string + // RemoteURL is ObjectStore URL provided by GitLab Rails + RemoteURL string + + // Size is the persisted file size + Size int64 + + // Name is the resource name to send back to GitLab rails. + // It differ from the real file name in order to avoid file collisions + Name string + + // a map containing different hashes + hashes map[string]string + + // Duration of upload in seconds + uploadDuration float64 +} + +type uploadClaims struct { + Upload map[string]string `json:"upload"` + jwt.StandardClaims +} + +// SHA256 hash of the handled file +func (fh *FileHandler) SHA256() string { + return fh.hashes["sha256"] +} + +// MD5 hash of the handled file +func (fh *FileHandler) MD5() string { + return fh.hashes["md5"] +} + +// GitLabFinalizeFields returns a map with all the fields GitLab Rails needs in order to finalize the upload. +func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, error) { + // TODO: remove `data` these once rails fully and exclusively support `signedData` (https://gitlab.com/gitlab-org/gitlab/-/issues/324873) + data := make(map[string]string) + signedData := make(map[string]string) + key := func(field string) string { + if prefix == "" { + return field + } + + return fmt.Sprintf("%s.%s", prefix, field) + } + + for k, v := range map[string]string{ + "name": fh.Name, + "path": fh.LocalPath, + "remote_url": fh.RemoteURL, + "remote_id": fh.RemoteID, + "size": strconv.FormatInt(fh.Size, 10), + "upload_duration": strconv.FormatFloat(fh.uploadDuration, 'f', -1, 64), + } { + data[key(k)] = v + signedData[k] = v + } + + for hashName, hash := range fh.hashes { + data[key(hashName)] = hash + signedData[hashName] = hash + } + + claims := uploadClaims{Upload: signedData, StandardClaims: secret.DefaultClaims} + jwtData, err := secret.JWTTokenString(claims) + if err != nil { + return nil, err + } + data[key("gitlab-workhorse-upload")] = jwtData + + return data, nil +} + +type consumer interface { + Consume(context.Context, io.Reader, time.Time) (int64, error) +} + +// Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done +// Make sure the provided context will not expire before finalizing upload with GitLab Rails. +func Upload(ctx context.Context, reader io.Reader, size int64, opts *UploadOpts) (*FileHandler, error) { + fh := &FileHandler{ + Name: opts.TempFilePrefix, + RemoteID: opts.RemoteID, + RemoteURL: opts.RemoteURL, + } + uploadStartTime := time.Now() + defer func() { fh.uploadDuration = time.Since(uploadStartTime).Seconds() }() + hashes := newMultiHash() + reader = io.TeeReader(reader, hashes.Writer) + + var clientMode string + var uploadDestination consumer + var err error + switch { + case opts.IsLocal(): + clientMode = "local" + uploadDestination, err = fh.newLocalFile(ctx, opts) + case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud(): + clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider) + p := &objectstore.GoCloudObjectParams{ + Ctx: ctx, + Mux: opts.ObjectStorageConfig.URLMux, + BucketURL: opts.ObjectStorageConfig.GoCloudConfig.URL, + ObjectName: opts.RemoteTempObjectID, + } + uploadDestination, err = objectstore.NewGoCloudObject(p) + case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid(): + clientMode = "s3" + uploadDestination, err = objectstore.NewS3Object( + opts.RemoteTempObjectID, + opts.ObjectStorageConfig.S3Credentials, + opts.ObjectStorageConfig.S3Config, + ) + case opts.IsMultipart(): + clientMode = "multipart" + uploadDestination, err = objectstore.NewMultipart( + opts.PresignedParts, + opts.PresignedCompleteMultipart, + opts.PresignedAbortMultipart, + opts.PresignedDelete, + opts.PutHeaders, + opts.PartSize, + ) + default: + clientMode = "http" + uploadDestination, err = objectstore.NewObject( + opts.PresignedPut, + opts.PresignedDelete, + opts.PutHeaders, + size, + ) + } + + if err != nil { + return nil, err + } + + var hlr *hardLimitReader + if opts.MaximumSize > 0 { + if size > opts.MaximumSize { + return nil, SizeError(fmt.Errorf("the upload size %d is over maximum of %d bytes", size, opts.MaximumSize)) + } + + hlr = &hardLimitReader{r: reader, n: opts.MaximumSize} + reader = hlr + } + + fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline) + if err != nil { + if (err == objectstore.ErrNotEnoughParts) || (hlr != nil && hlr.n < 0) { + err = ErrEntityTooLarge + } + return nil, err + } + + if size != -1 && size != fh.Size { + return nil, SizeError(fmt.Errorf("expected %d bytes but got only %d", size, fh.Size)) + } + + logger := log.WithContextFields(ctx, log.Fields{ + "copied_bytes": fh.Size, + "is_local": opts.IsLocal(), + "is_multipart": opts.IsMultipart(), + "is_remote": !opts.IsLocal(), + "remote_id": opts.RemoteID, + "temp_file_prefix": opts.TempFilePrefix, + "client_mode": clientMode, + }) + + if opts.IsLocal() { + logger = logger.WithField("local_temp_path", opts.LocalTempPath) + } else { + logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID) + } + + logger.Info("saved file") + fh.hashes = hashes.finish() + return fh, nil +} + +func (fh *FileHandler) newLocalFile(ctx context.Context, opts *UploadOpts) (consumer, error) { + // make sure TempFolder exists + err := os.MkdirAll(opts.LocalTempPath, 0700) + if err != nil { + return nil, fmt.Errorf("newLocalFile: mkdir %q: %v", opts.LocalTempPath, err) + } + + file, err := ioutil.TempFile(opts.LocalTempPath, opts.TempFilePrefix) + if err != nil { + return nil, fmt.Errorf("newLocalFile: create file: %v", err) + } + + go func() { + <-ctx.Done() + os.Remove(file.Name()) + }() + + fh.LocalPath = file.Name() + return &filestore.LocalFile{File: file}, nil +} diff --git a/workhorse/internal/upload/destination/destination_test.go b/workhorse/internal/upload/destination/destination_test.go new file mode 100644 index 00000000000..ddf0ea24d60 --- /dev/null +++ b/workhorse/internal/upload/destination/destination_test.go @@ -0,0 +1,504 @@ +package destination_test + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "os" + "path" + "strconv" + "strings" + "testing" + "time" + + "github.com/golang-jwt/jwt/v4" + "github.com/stretchr/testify/require" + "gocloud.dev/blob" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" +) + +func testDeadline() time.Time { + return time.Now().Add(destination.DefaultObjectStoreTimeout) +} + +func requireFileGetsRemovedAsync(t *testing.T, filePath string) { + var err error + require.Eventually(t, func() bool { + _, err = os.Stat(filePath) + return err != nil + }, 10*time.Second, 10*time.Millisecond) + require.True(t, os.IsNotExist(err), "File hasn't been deleted during cleanup") +} + +func requireObjectStoreDeletedAsync(t *testing.T, expectedDeletes int, osStub *test.ObjectstoreStub) { + require.Eventually(t, func() bool { return osStub.DeletesCnt() == expectedDeletes }, time.Second, time.Millisecond, "Object not deleted") +} + +func TestUploadWrongSize(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"} + fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, opts) + require.Error(t, err) + _, isSizeError := err.(destination.SizeError) + require.True(t, isSizeError, "Should fail with SizeError") + require.Nil(t, fh) +} + +func TestUploadWithKnownSizeExceedLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1} + fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts) + require.Error(t, err) + _, isSizeError := err.(destination.SizeError) + require.True(t, isSizeError, "Should fail with SizeError") + require.Nil(t, fh) +} + +func TestUploadWithUnknownSizeExceedLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1} + fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), -1, opts) + require.Equal(t, err, destination.ErrEntityTooLarge) + require.Nil(t, fh) +} + +func TestUploadWrongETag(t *testing.T) { + tests := []struct { + name string + multipart bool + }{ + {name: "single part"}, + {name: "multi part", multipart: true}, + } + + for _, spec := range tests { + t.Run(spec.name, func(t *testing.T) { + osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"}) + defer ts.Close() + + objectURL := ts.URL + test.ObjectPath + + opts := &destination.UploadOpts{ + RemoteID: "test-file", + RemoteURL: objectURL, + PresignedPut: objectURL + "?Signature=ASignature", + PresignedDelete: objectURL + "?Signature=AnotherSignature", + Deadline: testDeadline(), + } + if spec.multipart { + opts.PresignedParts = []string{objectURL + "?partNumber=1"} + opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSig" + opts.PresignedAbortMultipart = objectURL + "?Signature=AbortSig" + opts.PartSize = test.ObjectSize + + osStub.InitiateMultipartUpload(test.ObjectPath) + } + ctx, cancel := context.WithCancel(context.Background()) + fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts) + require.Nil(t, fh) + require.Error(t, err) + require.Equal(t, 1, osStub.PutsCnt(), "File not uploaded") + + cancel() // this will trigger an async cleanup + requireObjectStoreDeletedAsync(t, 1, osStub) + require.False(t, spec.multipart && osStub.IsMultipartUpload(test.ObjectPath), "there must be no multipart upload in progress now") + }) + } +} + +func TestUpload(t *testing.T) { + testhelper.ConfigureSecret() + + type remote int + const ( + notRemote remote = iota + remoteSingle + remoteMultipart + ) + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + tests := []struct { + name string + local bool + remote remote + }{ + {name: "Local only", local: true}, + {name: "Remote Single only", remote: remoteSingle}, + {name: "Remote Multipart only", remote: remoteMultipart}, + } + + for _, spec := range tests { + t.Run(spec.name, func(t *testing.T) { + var opts destination.UploadOpts + var expectedDeletes, expectedPuts int + + osStub, ts := test.StartObjectStore() + defer ts.Close() + + switch spec.remote { + case remoteSingle: + objectURL := ts.URL + test.ObjectPath + + opts.RemoteID = "test-file" + opts.RemoteURL = objectURL + opts.PresignedPut = objectURL + "?Signature=ASignature" + opts.PresignedDelete = objectURL + "?Signature=AnotherSignature" + opts.Deadline = testDeadline() + + expectedDeletes = 1 + expectedPuts = 1 + case remoteMultipart: + objectURL := ts.URL + test.ObjectPath + + opts.RemoteID = "test-file" + opts.RemoteURL = objectURL + opts.PresignedDelete = objectURL + "?Signature=AnotherSignature" + opts.PartSize = int64(len(test.ObjectContent)/2) + 1 + opts.PresignedParts = []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"} + opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSignature" + opts.Deadline = testDeadline() + + osStub.InitiateMultipartUpload(test.ObjectPath) + expectedDeletes = 1 + expectedPuts = 2 + } + + if spec.local { + opts.LocalTempPath = tmpFolder + opts.TempFilePrefix = "test-file" + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) + require.NoError(t, err) + require.NotNil(t, fh) + + require.Equal(t, opts.RemoteID, fh.RemoteID) + require.Equal(t, opts.RemoteURL, fh.RemoteURL) + + if spec.local { + require.NotEmpty(t, fh.LocalPath, "File not persisted on disk") + _, err := os.Stat(fh.LocalPath) + require.NoError(t, err) + + dir := path.Dir(fh.LocalPath) + require.Equal(t, opts.LocalTempPath, dir) + filename := path.Base(fh.LocalPath) + beginsWithPrefix := strings.HasPrefix(filename, opts.TempFilePrefix) + require.True(t, beginsWithPrefix, fmt.Sprintf("LocalPath filename %q do not begin with TempFilePrefix %q", filename, opts.TempFilePrefix)) + } else { + require.Empty(t, fh.LocalPath, "LocalPath must be empty for non local uploads") + } + + require.Equal(t, test.ObjectSize, fh.Size) + require.Equal(t, test.ObjectMD5, fh.MD5()) + require.Equal(t, test.ObjectSHA256, fh.SHA256()) + + require.Equal(t, expectedPuts, osStub.PutsCnt(), "ObjectStore PutObject count mismatch") + require.Equal(t, 0, osStub.DeletesCnt(), "File deleted too early") + + cancel() // this will trigger an async cleanup + requireObjectStoreDeletedAsync(t, expectedDeletes, osStub) + requireFileGetsRemovedAsync(t, fh.LocalPath) + + // checking generated fields + fields, err := fh.GitLabFinalizeFields("file") + require.NoError(t, err) + + checkFileHandlerWithFields(t, fh, fields, "file") + + token, jwtErr := jwt.ParseWithClaims(fields["file.gitlab-workhorse-upload"], &testhelper.UploadClaims{}, testhelper.ParseJWT) + require.NoError(t, jwtErr) + + uploadFields := token.Claims.(*testhelper.UploadClaims).Upload + + checkFileHandlerWithFields(t, fh, uploadFields, "") + }) + } +} + +func TestUploadWithS3WorkhorseClient(t *testing.T) { + tests := []struct { + name string + objectSize int64 + maxSize int64 + expectedErr error + }{ + { + name: "known size with no limit", + objectSize: test.ObjectSize, + }, + { + name: "unknown size with no limit", + objectSize: -1, + }, + { + name: "unknown object size with limit", + objectSize: -1, + maxSize: test.ObjectSize - 1, + expectedErr: destination.ErrEntityTooLarge, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + s3Creds, s3Config, sess, ts := test.SetupS3(t, "") + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + remoteObject := "tmp/test-file/1" + opts := destination.UploadOpts{ + RemoteID: "test-file", + Deadline: testDeadline(), + UseWorkhorseClient: true, + RemoteTempObjectID: remoteObject, + ObjectStorageConfig: destination.ObjectStorageConfig{ + Provider: "AWS", + S3Credentials: s3Creds, + S3Config: s3Config, + }, + MaximumSize: tc.maxSize, + } + + _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, &opts) + + if tc.expectedErr == nil { + require.NoError(t, err) + test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent) + } else { + require.Equal(t, tc.expectedErr, err) + test.S3ObjectDoesNotExist(t, sess, s3Config, remoteObject) + } + }) + } +} + +func TestUploadWithAzureWorkhorseClient(t *testing.T) { + mux, bucketDir, cleanup := test.SetupGoCloudFileBucket(t, "azblob") + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + remoteObject := "tmp/test-file/1" + opts := destination.UploadOpts{ + RemoteID: "test-file", + Deadline: testDeadline(), + UseWorkhorseClient: true, + RemoteTempObjectID: remoteObject, + ObjectStorageConfig: destination.ObjectStorageConfig{ + Provider: "AzureRM", + URLMux: mux, + GoCloudConfig: config.GoCloudConfig{URL: "azblob://test-container"}, + }, + } + + _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) + require.NoError(t, err) + + test.GoCloudObjectExists(t, bucketDir, remoteObject) +} + +func TestUploadWithUnknownGoCloudScheme(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mux := new(blob.URLMux) + + remoteObject := "tmp/test-file/1" + opts := destination.UploadOpts{ + RemoteID: "test-file", + Deadline: testDeadline(), + UseWorkhorseClient: true, + RemoteTempObjectID: remoteObject, + ObjectStorageConfig: destination.ObjectStorageConfig{ + Provider: "SomeCloud", + URLMux: mux, + GoCloudConfig: config.GoCloudConfig{URL: "foo://test-container"}, + }, + } + + _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) + require.Error(t, err) +} + +func TestUploadMultipartInBodyFailure(t *testing.T) { + osStub, ts := test.StartObjectStore() + defer ts.Close() + + // this is a broken path because it contains bucket name but no key + // this is the only way to get an in-body failure from our ObjectStoreStub + objectPath := "/bucket-but-no-object-key" + objectURL := ts.URL + objectPath + opts := destination.UploadOpts{ + RemoteID: "test-file", + RemoteURL: objectURL, + PartSize: test.ObjectSize, + PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}, + PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature", + Deadline: testDeadline(), + } + + osStub.InitiateMultipartUpload(objectPath) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) + require.Nil(t, fh) + require.Error(t, err) + require.EqualError(t, err, test.MultipartUploadInternalError().Error()) +} + +func TestUploadRemoteFileWithLimit(t *testing.T) { + testhelper.ConfigureSecret() + + type remote int + const ( + notRemote remote = iota + remoteSingle + remoteMultipart + ) + + remoteTypes := []remote{remoteSingle, remoteMultipart} + + tests := []struct { + name string + objectSize int64 + maxSize int64 + expectedErr error + testData string + }{ + { + name: "known size with no limit", + testData: test.ObjectContent, + objectSize: test.ObjectSize, + }, + { + name: "unknown size with no limit", + testData: test.ObjectContent, + objectSize: -1, + }, + { + name: "unknown object size with limit", + testData: test.ObjectContent, + objectSize: -1, + maxSize: test.ObjectSize - 1, + expectedErr: destination.ErrEntityTooLarge, + }, + { + name: "large object with unknown size with limit", + testData: string(make([]byte, 20000)), + objectSize: -1, + maxSize: 19000, + expectedErr: destination.ErrEntityTooLarge, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var opts destination.UploadOpts + + for _, remoteType := range remoteTypes { + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + osStub, ts := test.StartObjectStore() + defer ts.Close() + + switch remoteType { + case remoteSingle: + objectURL := ts.URL + test.ObjectPath + + opts.RemoteID = "test-file" + opts.RemoteURL = objectURL + opts.PresignedPut = objectURL + "?Signature=ASignature" + opts.PresignedDelete = objectURL + "?Signature=AnotherSignature" + opts.Deadline = testDeadline() + opts.MaximumSize = tc.maxSize + case remoteMultipart: + objectURL := ts.URL + test.ObjectPath + + opts.RemoteID = "test-file" + opts.RemoteURL = objectURL + opts.PresignedDelete = objectURL + "?Signature=AnotherSignature" + opts.PartSize = int64(len(tc.testData)/2) + 1 + opts.PresignedParts = []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"} + opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSignature" + opts.Deadline = testDeadline() + opts.MaximumSize = tc.maxSize + + require.Less(t, int64(len(tc.testData)), int64(len(opts.PresignedParts))*opts.PartSize, "check part size calculation") + + osStub.InitiateMultipartUpload(test.ObjectPath) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fh, err := destination.Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, &opts) + + if tc.expectedErr == nil { + require.NoError(t, err) + require.NotNil(t, fh) + } else { + require.True(t, errors.Is(err, tc.expectedErr)) + require.Nil(t, fh) + } + } + }) + } +} + +func checkFileHandlerWithFields(t *testing.T, fh *destination.FileHandler, fields map[string]string, prefix string) { + key := func(field string) string { + if prefix == "" { + return field + } + + return fmt.Sprintf("%s.%s", prefix, field) + } + + require.Equal(t, fh.Name, fields[key("name")]) + require.Equal(t, fh.LocalPath, fields[key("path")]) + require.Equal(t, fh.RemoteURL, fields[key("remote_url")]) + require.Equal(t, fh.RemoteID, fields[key("remote_id")]) + require.Equal(t, strconv.FormatInt(test.ObjectSize, 10), fields[key("size")]) + require.Equal(t, test.ObjectMD5, fields[key("md5")]) + require.Equal(t, test.ObjectSHA1, fields[key("sha1")]) + require.Equal(t, test.ObjectSHA256, fields[key("sha256")]) + require.Equal(t, test.ObjectSHA512, fields[key("sha512")]) + require.NotEmpty(t, fields[key("upload_duration")]) +} diff --git a/workhorse/internal/upload/destination/filestore/filestore.go b/workhorse/internal/upload/destination/filestore/filestore.go new file mode 100644 index 00000000000..2d88874bf25 --- /dev/null +++ b/workhorse/internal/upload/destination/filestore/filestore.go @@ -0,0 +1,21 @@ +// The filestore package has a consumer specific to uploading to local disk storage. +package filestore + +import ( + "context" + "io" + "time" +) + +type LocalFile struct { + File io.WriteCloser +} + +func (lf *LocalFile) Consume(_ context.Context, r io.Reader, _ time.Time) (int64, error) { + n, err := io.Copy(lf.File, r) + errClose := lf.File.Close() + if err == nil { + err = errClose + } + return n, err +} diff --git a/workhorse/internal/upload/destination/filestore/filestore_test.go b/workhorse/internal/upload/destination/filestore/filestore_test.go new file mode 100644 index 00000000000..ec67eae96b9 --- /dev/null +++ b/workhorse/internal/upload/destination/filestore/filestore_test.go @@ -0,0 +1,38 @@ +package filestore + +import ( + "context" + "io/ioutil" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestConsume(t *testing.T) { + f, err := ioutil.TempFile("", "filestore-local-file") + if f != nil { + defer os.Remove(f.Name()) + } + require.NoError(t, err) + defer f.Close() + + localFile := &LocalFile{File: f} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + content := "file content" + reader := strings.NewReader(content) + var deadline time.Time + + n, err := localFile.Consume(ctx, reader, deadline) + require.NoError(t, err) + require.Equal(t, int64(len(content)), n) + + consumedContent, err := ioutil.ReadFile(f.Name()) + require.NoError(t, err) + require.Equal(t, content, string(consumedContent)) +} diff --git a/workhorse/internal/upload/destination/multi_hash.go b/workhorse/internal/upload/destination/multi_hash.go new file mode 100644 index 00000000000..7d4884af3dc --- /dev/null +++ b/workhorse/internal/upload/destination/multi_hash.go @@ -0,0 +1,48 @@ +package destination + +import ( + "crypto/md5" + "crypto/sha1" + "crypto/sha256" + "crypto/sha512" + "encoding/hex" + "hash" + "io" +) + +var hashFactories = map[string](func() hash.Hash){ + "md5": md5.New, + "sha1": sha1.New, + "sha256": sha256.New, + "sha512": sha512.New, +} + +type multiHash struct { + io.Writer + hashes map[string]hash.Hash +} + +func newMultiHash() (m *multiHash) { + m = &multiHash{} + m.hashes = make(map[string]hash.Hash) + + var writers []io.Writer + for hash, hashFactory := range hashFactories { + writer := hashFactory() + + m.hashes[hash] = writer + writers = append(writers, writer) + } + + m.Writer = io.MultiWriter(writers...) + return m +} + +func (m *multiHash) finish() map[string]string { + h := make(map[string]string) + for hashName, hash := range m.hashes { + checksum := hash.Sum(nil) + h[hashName] = hex.EncodeToString(checksum) + } + return h +} diff --git a/workhorse/internal/upload/destination/objectstore/doc.go b/workhorse/internal/upload/destination/objectstore/doc.go new file mode 100644 index 00000000000..00f98f230ec --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/doc.go @@ -0,0 +1,3 @@ +// The objectstore package consists of a number of consumers specific to uploading +// to object storage providers that are S3 compatible (e.g. AWS S3, GCP, Azure). +package objectstore diff --git a/workhorse/internal/upload/destination/objectstore/gocloud_object.go b/workhorse/internal/upload/destination/objectstore/gocloud_object.go new file mode 100644 index 00000000000..38545086994 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/gocloud_object.go @@ -0,0 +1,100 @@ +package objectstore + +import ( + "context" + "io" + "time" + + "gitlab.com/gitlab-org/labkit/log" + "gocloud.dev/blob" + "gocloud.dev/gcerrors" +) + +type GoCloudObject struct { + bucket *blob.Bucket + mux *blob.URLMux + bucketURL string + objectName string + *uploader +} + +type GoCloudObjectParams struct { + Ctx context.Context + Mux *blob.URLMux + BucketURL string + ObjectName string +} + +func NewGoCloudObject(p *GoCloudObjectParams) (*GoCloudObject, error) { + bucket, err := p.Mux.OpenBucket(p.Ctx, p.BucketURL) + if err != nil { + return nil, err + } + + o := &GoCloudObject{ + bucket: bucket, + mux: p.Mux, + bucketURL: p.BucketURL, + objectName: p.ObjectName, + } + + o.uploader = newUploader(o) + return o, nil +} + +func (o *GoCloudObject) Upload(ctx context.Context, r io.Reader) error { + defer o.bucket.Close() + + writer, err := o.bucket.NewWriter(ctx, o.objectName, nil) + if err != nil { + log.ContextLogger(ctx).WithError(err).Error("error creating GoCloud bucket") + return err + } + + if _, err = io.Copy(writer, r); err != nil { + log.ContextLogger(ctx).WithError(err).Error("error writing to GoCloud bucket") + writer.Close() + return err + } + + if err := writer.Close(); err != nil { + log.ContextLogger(ctx).WithError(err).Error("error closing GoCloud bucket") + return err + } + + return nil +} + +func (o *GoCloudObject) ETag() string { + return "" +} + +func (o *GoCloudObject) Abort() { + o.Delete() +} + +// Delete will always attempt to delete the temporary file. +// According to https://github.com/google/go-cloud/blob/7818961b5c9a112f7e092d3a2d8479cbca80d187/blob/azureblob/azureblob.go#L881-L883, +// if the writer is closed before any Write is called, Close will create an empty file. +func (o *GoCloudObject) Delete() { + if o.bucketURL == "" || o.objectName == "" { + return + } + + // Note we can't use the request context because in a successful + // case, the original request has already completed. + deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background + defer cancel() + + bucket, err := o.mux.OpenBucket(deleteCtx, o.bucketURL) + if err != nil { + log.WithError(err).Error("error opening bucket for delete") + return + } + + if err := bucket.Delete(deleteCtx, o.objectName); err != nil { + if gcerrors.Code(err) != gcerrors.NotFound { + log.WithError(err).Error("error deleting object") + } + } +} diff --git a/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go b/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go new file mode 100644 index 00000000000..57b3a35b41e --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go @@ -0,0 +1,56 @@ +package objectstore_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" +) + +func TestGoCloudObjectUpload(t *testing.T) { + mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azuretest") + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + deadline := time.Now().Add(testTimeout) + + objectName := "test.png" + testURL := "azuretest://azure.example.com/test-container" + p := &objectstore.GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName} + object, err := objectstore.NewGoCloudObject(p) + require.NotNil(t, object) + require.NoError(t, err) + + // copy data + n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch") + + bucket, err := mux.OpenBucket(ctx, testURL) + require.NoError(t, err) + + // Verify the data was copied correctly. + received, err := bucket.ReadAll(ctx, objectName) + require.NoError(t, err) + require.Equal(t, []byte(test.ObjectContent), received) + + cancel() + + testhelper.Retry(t, 5*time.Second, func() error { + exists, err := bucket.Exists(ctx, objectName) + require.NoError(t, err) + + if exists { + return fmt.Errorf("file %s is still present", objectName) + } else { + return nil + } + }) +} diff --git a/workhorse/internal/upload/destination/objectstore/multipart.go b/workhorse/internal/upload/destination/objectstore/multipart.go new file mode 100644 index 00000000000..4c5b64b27ee --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/multipart.go @@ -0,0 +1,187 @@ +package objectstore + +import ( + "bytes" + "context" + "encoding/xml" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + + "gitlab.com/gitlab-org/labkit/mask" +) + +// ErrNotEnoughParts will be used when writing more than size * len(partURLs) +var ErrNotEnoughParts = errors.New("not enough Parts") + +// Multipart represents a MultipartUpload on a S3 compatible Object Store service. +// It can be used as io.WriteCloser for uploading an object +type Multipart struct { + PartURLs []string + // CompleteURL is a presigned URL for CompleteMultipartUpload + CompleteURL string + // AbortURL is a presigned URL for AbortMultipartUpload + AbortURL string + // DeleteURL is a presigned URL for RemoveObject + DeleteURL string + PutHeaders map[string]string + partSize int64 + etag string + + *uploader +} + +// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes +// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. +// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources +func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, partSize int64) (*Multipart, error) { + m := &Multipart{ + PartURLs: partURLs, + CompleteURL: completeURL, + AbortURL: abortURL, + DeleteURL: deleteURL, + PutHeaders: putHeaders, + partSize: partSize, + } + + m.uploader = newUploader(m) + return m, nil +} + +func (m *Multipart) Upload(ctx context.Context, r io.Reader) error { + cmu := &CompleteMultipartUpload{} + for i, partURL := range m.PartURLs { + src := io.LimitReader(r, m.partSize) + part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1) + if err != nil { + return err + } + if part == nil { + break + } else { + cmu.Part = append(cmu.Part, part) + } + } + + n, err := io.Copy(ioutil.Discard, r) + if err != nil { + return fmt.Errorf("drain pipe: %v", err) + } + if n > 0 { + return ErrNotEnoughParts + } + + if err := m.complete(ctx, cmu); err != nil { + return err + } + + return nil +} + +func (m *Multipart) ETag() string { + return m.etag +} +func (m *Multipart) Abort() { + deleteURL(m.AbortURL) +} + +func (m *Multipart) Delete() { + deleteURL(m.DeleteURL) +} + +func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { + file, err := ioutil.TempFile("", "part-buffer") + if err != nil { + return nil, fmt.Errorf("create temporary buffer file: %v", err) + } + defer file.Close() + + if err := os.Remove(file.Name()); err != nil { + return nil, err + } + + n, err := io.Copy(file, src) + if err != nil { + return nil, err + } + if n == 0 { + return nil, nil + } + + if _, err = file.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err) + } + + etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n) + if err != nil { + return nil, fmt.Errorf("upload part %d: %v", partNumber, err) + } + return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil +} + +func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) { + deadline, ok := ctx.Deadline() + if !ok { + return "", fmt.Errorf("missing deadline") + } + + part, err := newObject(url, "", headers, size, false) + if err != nil { + return "", err + } + + if n, err := part.Consume(ctx, io.LimitReader(body, size), deadline); err != nil || n < size { + if err == nil { + err = io.ErrUnexpectedEOF + } + return "", err + } + + return part.ETag(), nil +} + +func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error { + body, err := xml.Marshal(cmu) + if err != nil { + return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) + } + + req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create CompleteMultipartUpload request: %v", err) + } + req.ContentLength = int64(len(body)) + req.Header.Set("Content-Type", "application/xml") + req = req.WithContext(ctx) + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status) + } + + result := &compoundCompleteMultipartUploadResult{} + decoder := xml.NewDecoder(resp.Body) + if err := decoder.Decode(&result); err != nil { + return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err) + } + + if result.isError() { + return result + } + + if result.CompleteMultipartUploadResult == nil { + return fmt.Errorf("empty CompleteMultipartUploadResult") + } + + m.etag = extractETag(result.ETag) + + return nil +} diff --git a/workhorse/internal/upload/destination/objectstore/multipart_test.go b/workhorse/internal/upload/destination/objectstore/multipart_test.go new file mode 100644 index 00000000000..4aff3467e30 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/multipart_test.go @@ -0,0 +1,64 @@ +package objectstore_test + +import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" +) + +func TestMultipartUploadWithUpcaseETags(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var putCnt, postCnt int + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + defer r.Body.Close() + + // Part upload request + if r.Method == "PUT" { + putCnt++ + + w.Header().Set("ETag", strings.ToUpper(test.ObjectMD5)) + } + + // POST with CompleteMultipartUpload request + if r.Method == "POST" { + completeBody := `<CompleteMultipartUploadResult> + <Bucket>test-bucket</Bucket> + <ETag>No Longer Checked</ETag> + </CompleteMultipartUploadResult>` + postCnt++ + + w.Write([]byte(completeBody)) + } + })) + defer ts.Close() + + deadline := time.Now().Add(testTimeout) + + m, err := objectstore.NewMultipart( + []string{ts.URL}, // a single presigned part URL + ts.URL, // the complete multipart upload URL + "", // no abort + "", // no delete + map[string]string{}, // no custom headers + test.ObjectSize) // parts size equal to the whole content. Only 1 part + require.NoError(t, err) + + _, err = m.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, 1, putCnt, "1 part expected") + require.Equal(t, 1, postCnt, "1 complete multipart upload expected") +} diff --git a/workhorse/internal/upload/destination/objectstore/object.go b/workhorse/internal/upload/destination/objectstore/object.go new file mode 100644 index 00000000000..b7c4f12f009 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/object.go @@ -0,0 +1,95 @@ +package objectstore + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + + "gitlab.com/gitlab-org/labkit/mask" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper/httptransport" +) + +var httpClient = &http.Client{ + Transport: httptransport.New(), +} + +// Object represents an object on a S3 compatible Object Store service. +// It can be used as io.WriteCloser for uploading an object +type Object struct { + // putURL is a presigned URL for PutObject + putURL string + // deleteURL is a presigned URL for RemoveObject + deleteURL string + putHeaders map[string]string + size int64 + etag string + metrics bool + + *uploader +} + +type StatusCodeError error + +// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading. +func NewObject(putURL, deleteURL string, putHeaders map[string]string, size int64) (*Object, error) { + return newObject(putURL, deleteURL, putHeaders, size, true) +} + +func newObject(putURL, deleteURL string, putHeaders map[string]string, size int64, metrics bool) (*Object, error) { + o := &Object{ + putURL: putURL, + deleteURL: deleteURL, + putHeaders: putHeaders, + size: size, + metrics: metrics, + } + + o.uploader = newETagCheckUploader(o, metrics) + return o, nil +} + +func (o *Object) Upload(ctx context.Context, r io.Reader) error { + // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err) + req, err := http.NewRequest(http.MethodPut, o.putURL, ioutil.NopCloser(r)) + + if err != nil { + return fmt.Errorf("PUT %q: %v", mask.URL(o.putURL), err) + } + req.ContentLength = o.size + + for k, v := range o.putHeaders { + req.Header.Set(k, v) + } + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("PUT request %q: %v", mask.URL(o.putURL), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + if o.metrics { + objectStorageUploadRequestsInvalidStatus.Inc() + } + return StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.putURL), resp.Status)) + } + + o.etag = extractETag(resp.Header.Get("ETag")) + + return nil +} + +func (o *Object) ETag() string { + return o.etag +} + +func (o *Object) Abort() { + o.Delete() +} + +func (o *Object) Delete() { + deleteURL(o.deleteURL) +} diff --git a/workhorse/internal/upload/destination/objectstore/object_test.go b/workhorse/internal/upload/destination/objectstore/object_test.go new file mode 100644 index 00000000000..24117891b6d --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/object_test.go @@ -0,0 +1,149 @@ +package objectstore_test + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" +) + +const testTimeout = 10 * time.Second + +type osFactory func() (*test.ObjectstoreStub, *httptest.Server) + +func testObjectUploadNoErrors(t *testing.T, startObjectStore osFactory, useDeleteURL bool, contentType string) { + osStub, ts := startObjectStore() + defer ts.Close() + + objectURL := ts.URL + test.ObjectPath + var deleteURL string + if useDeleteURL { + deleteURL = objectURL + } + + putHeaders := map[string]string{"Content-Type": contentType} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + deadline := time.Now().Add(testTimeout) + object, err := objectstore.NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize) + require.NoError(t, err) + + // copy data + n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch") + + require.Equal(t, contentType, osStub.GetHeader(test.ObjectPath, "Content-Type")) + + // Checking MD5 extraction + require.Equal(t, osStub.GetObjectMD5(test.ObjectPath), object.ETag()) + + // Checking cleanup + cancel() + require.Equal(t, 1, osStub.PutsCnt(), "Object hasn't been uploaded") + + var expectedDeleteCnt int + if useDeleteURL { + expectedDeleteCnt = 1 + } + require.Eventually(t, func() bool { return osStub.DeletesCnt() == expectedDeleteCnt }, time.Second, time.Millisecond) + + if useDeleteURL { + require.Equal(t, 1, osStub.DeletesCnt(), "Object hasn't been deleted") + } else { + require.Equal(t, 0, osStub.DeletesCnt(), "Object has been deleted") + } +} + +func TestObjectUpload(t *testing.T) { + t.Run("with delete URL", func(t *testing.T) { + testObjectUploadNoErrors(t, test.StartObjectStore, true, "application/octet-stream") + }) + t.Run("without delete URL", func(t *testing.T) { + testObjectUploadNoErrors(t, test.StartObjectStore, false, "application/octet-stream") + }) + t.Run("with custom content type", func(t *testing.T) { + testObjectUploadNoErrors(t, test.StartObjectStore, false, "image/jpeg") + }) + t.Run("with upcase ETAG", func(t *testing.T) { + factory := func() (*test.ObjectstoreStub, *httptest.Server) { + md5s := map[string]string{ + test.ObjectPath: strings.ToUpper(test.ObjectMD5), + } + + return test.StartObjectStoreWithCustomMD5(md5s) + } + + testObjectUploadNoErrors(t, factory, false, "application/octet-stream") + }) +} + +func TestObjectUpload404(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + deadline := time.Now().Add(testTimeout) + objectURL := ts.URL + test.ObjectPath + object, err := objectstore.NewObject(objectURL, "", map[string]string{}, test.ObjectSize) + require.NoError(t, err) + _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + + require.Error(t, err) + _, isStatusCodeError := err.(objectstore.StatusCodeError) + require.True(t, isStatusCodeError, "Should fail with StatusCodeError") + require.Contains(t, err.Error(), "404") +} + +type endlessReader struct{} + +func (e *endlessReader) Read(p []byte) (n int, err error) { + for i := 0; i < len(p); i++ { + p[i] = '*' + } + + return len(p), nil +} + +// TestObjectUploadBrokenConnection purpose is to ensure that errors caused by the upload destination get propagated back correctly. +// This is important for troubleshooting in production. +func TestObjectUploadBrokenConnection(t *testing.T) { + // This test server closes connection immediately + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hj, ok := w.(http.Hijacker) + if !ok { + require.FailNow(t, "webserver doesn't support hijacking") + } + conn, _, err := hj.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + conn.Close() + })) + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + deadline := time.Now().Add(testTimeout) + objectURL := ts.URL + test.ObjectPath + object, err := objectstore.NewObject(objectURL, "", map[string]string{}, -1) + require.NoError(t, err) + + _, copyErr := object.Consume(ctx, &endlessReader{}, deadline) + require.Error(t, copyErr) + require.NotEqual(t, io.ErrClosedPipe, copyErr, "We are shadowing the real error") +} diff --git a/workhorse/internal/upload/destination/objectstore/prometheus.go b/workhorse/internal/upload/destination/objectstore/prometheus.go new file mode 100644 index 00000000000..20762fb52bc --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/prometheus.go @@ -0,0 +1,39 @@ +package objectstore + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + objectStorageUploadRequests = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_object_storage_upload_requests", + Help: "How many object storage requests have been processed", + }, + []string{"status"}, + ) + objectStorageUploadsOpen = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "gitlab_workhorse_object_storage_upload_open", + Help: "Describes many object storage requests are open now", + }, + ) + objectStorageUploadBytes = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_object_storage_upload_bytes", + Help: "How many bytes were sent to object storage", + }, + ) + objectStorageUploadTime = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "gitlab_workhorse_object_storage_upload_time", + Help: "How long it took to upload objects", + Buckets: objectStorageUploadTimeBuckets, + }) + + objectStorageUploadRequestsRequestFailed = objectStorageUploadRequests.WithLabelValues("request-failed") + objectStorageUploadRequestsInvalidStatus = objectStorageUploadRequests.WithLabelValues("invalid-status") + + objectStorageUploadTimeBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100} +) diff --git a/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go b/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go new file mode 100644 index 00000000000..b84f5757f49 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go @@ -0,0 +1,51 @@ +package objectstore + +import ( + "encoding/xml" + "fmt" +) + +// CompleteMultipartUpload is the S3 CompleteMultipartUpload body +type CompleteMultipartUpload struct { + Part []*completeMultipartUploadPart +} + +type completeMultipartUploadPart struct { + PartNumber int + ETag string +} + +// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request +type CompleteMultipartUploadResult struct { + Location string + Bucket string + Key string + ETag string +} + +// CompleteMultipartUploadError is the in-body error structure +// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples +// the answer contains other fields we are not using +type CompleteMultipartUploadError struct { + XMLName xml.Name `xml:"Error"` + Code string + Message string +} + +func (c *CompleteMultipartUploadError) Error() string { + return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message) +} + +// compoundCompleteMultipartUploadResult holds both CompleteMultipartUploadResult and CompleteMultipartUploadError +// this allow us to deserialize the response body where the root element can either be Error orCompleteMultipartUploadResult +type compoundCompleteMultipartUploadResult struct { + *CompleteMultipartUploadResult + *CompleteMultipartUploadError + + // XMLName this overrides CompleteMultipartUploadError.XMLName tags + XMLName xml.Name +} + +func (c *compoundCompleteMultipartUploadResult) isError() bool { + return c.CompleteMultipartUploadError != nil +} diff --git a/workhorse/internal/upload/destination/objectstore/s3_object.go b/workhorse/internal/upload/destination/objectstore/s3_object.go new file mode 100644 index 00000000000..ce0cccc7eb1 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/s3_object.go @@ -0,0 +1,119 @@ +package objectstore + +import ( + "context" + "io" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" +) + +type S3Object struct { + credentials config.S3Credentials + config config.S3Config + objectName string + uploaded bool + + *uploader +} + +func NewS3Object(objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config) (*S3Object, error) { + o := &S3Object{ + credentials: s3Credentials, + config: s3Config, + objectName: objectName, + } + + o.uploader = newUploader(o) + return o, nil +} + +func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) { + if s3Config.ServerSideEncryption != "" { + input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption) + + if s3Config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms && s3Config.SSEKMSKeyID != "" { + input.SSEKMSKeyId = aws.String(s3Config.SSEKMSKeyID) + } + } +} + +func (s *S3Object) Upload(ctx context.Context, r io.Reader) error { + sess, err := setupS3Session(s.credentials, s.config) + if err != nil { + log.WithError(err).Error("error creating S3 session") + return err + } + + uploader := s3manager.NewUploader(sess) + + input := &s3manager.UploadInput{ + Bucket: aws.String(s.config.Bucket), + Key: aws.String(s.objectName), + Body: r, + } + + setEncryptionOptions(input, s.config) + + _, err = uploader.UploadWithContext(ctx, input) + if err != nil { + log.WithError(err).Error("error uploading S3 session") + // Get the root cause, such as ErrEntityTooLarge, so we can return the proper HTTP status code + return unwrapAWSError(err) + } + + s.uploaded = true + + return nil +} + +func (s *S3Object) ETag() string { + return "" +} + +func (s *S3Object) Abort() { + s.Delete() +} + +func (s *S3Object) Delete() { + if !s.uploaded { + return + } + + session, err := setupS3Session(s.credentials, s.config) + if err != nil { + log.WithError(err).Error("error setting up S3 session in delete") + return + } + + svc := s3.New(session) + input := &s3.DeleteObjectInput{ + Bucket: aws.String(s.config.Bucket), + Key: aws.String(s.objectName), + } + + // Note we can't use the request context because in a successful + // case, the original request has already completed. + deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background + defer cancel() + + _, err = svc.DeleteObjectWithContext(deleteCtx, input) + if err != nil { + log.WithError(err).Error("error deleting S3 object", err) + } +} + +// This is needed until https://github.com/aws/aws-sdk-go/issues/2820 is closed. +func unwrapAWSError(e error) error { + if awsErr, ok := e.(awserr.Error); ok { + return unwrapAWSError(awsErr.OrigErr()) + } + + return e +} diff --git a/workhorse/internal/upload/destination/objectstore/s3_object_test.go b/workhorse/internal/upload/destination/objectstore/s3_object_test.go new file mode 100644 index 00000000000..b81b0ae2024 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/s3_object_test.go @@ -0,0 +1,174 @@ +package objectstore_test + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" +) + +type failedReader struct { + io.Reader +} + +func (r *failedReader) Read(p []byte) (int, error) { + origErr := fmt.Errorf("entity is too large") + return 0, awserr.New("Read", "read failed", origErr) +} + +func TestS3ObjectUpload(t *testing.T) { + testCases := []struct { + encryption string + }{ + {encryption: ""}, + {encryption: s3.ServerSideEncryptionAes256}, + {encryption: s3.ServerSideEncryptionAwsKms}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("encryption=%v", tc.encryption), func(t *testing.T) { + creds, config, sess, ts := test.SetupS3(t, tc.encryption) + defer ts.Close() + + deadline := time.Now().Add(testTimeout) + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + objectName := filepath.Join(tmpDir, "s3-test-data") + ctx, cancel := context.WithCancel(context.Background()) + + object, err := objectstore.NewS3Object(objectName, creds, config) + require.NoError(t, err) + + // copy data + n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch") + + test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent) + test.CheckS3Metadata(t, sess, config, objectName) + + cancel() + + testhelper.Retry(t, 5*time.Second, func() error { + if test.S3ObjectDoesNotExist(t, sess, config, objectName) { + return nil + } + + return fmt.Errorf("file is still present") + }) + }) + } +} + +func TestConcurrentS3ObjectUpload(t *testing.T) { + creds, uploadsConfig, uploadsSession, uploadServer := test.SetupS3WithBucket(t, "uploads", "") + defer uploadServer.Close() + + // This will return a separate S3 endpoint + _, artifactsConfig, artifactsSession, artifactsServer := test.SetupS3WithBucket(t, "artifacts", "") + defer artifactsServer.Close() + + deadline := time.Now().Add(testTimeout) + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + var wg sync.WaitGroup + + for i := 0; i < 4; i++ { + wg.Add(1) + + go func(index int) { + var sess *session.Session + var config config.S3Config + + if index%2 == 0 { + sess = uploadsSession + config = uploadsConfig + } else { + sess = artifactsSession + config = artifactsConfig + } + + name := fmt.Sprintf("s3-test-data-%d", index) + objectName := filepath.Join(tmpDir, name) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + object, err := objectstore.NewS3Object(objectName, creds, config) + require.NoError(t, err) + + // copy data + n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.NoError(t, err) + require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch") + + test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent) + wg.Done() + }(i) + } + + wg.Wait() +} + +func TestS3ObjectUploadCancel(t *testing.T) { + creds, config, _, ts := test.SetupS3(t, "") + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + + deadline := time.Now().Add(testTimeout) + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + objectName := filepath.Join(tmpDir, "s3-test-data") + + object, err := objectstore.NewS3Object(objectName, creds, config) + + require.NoError(t, err) + + // Cancel the transfer before the data has been copied to ensure + // we handle this gracefully. + cancel() + + _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline) + require.Error(t, err) + require.Equal(t, "context canceled", err.Error()) +} + +func TestS3ObjectUploadLimitReached(t *testing.T) { + creds, config, _, ts := test.SetupS3(t, "") + defer ts.Close() + + deadline := time.Now().Add(testTimeout) + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + objectName := filepath.Join(tmpDir, "s3-test-data") + object, err := objectstore.NewS3Object(objectName, creds, config) + require.NoError(t, err) + + _, err = object.Consume(context.Background(), &failedReader{}, deadline) + require.Error(t, err) + require.Equal(t, "entity is too large", err.Error()) +} diff --git a/workhorse/internal/upload/destination/objectstore/s3_session.go b/workhorse/internal/upload/destination/objectstore/s3_session.go new file mode 100644 index 00000000000..a0c1f099145 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/s3_session.go @@ -0,0 +1,94 @@ +package objectstore + +import ( + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" +) + +type s3Session struct { + session *session.Session + expiry time.Time +} + +type s3SessionCache struct { + // An S3 session is cached by its input configuration (e.g. region, + // endpoint, path style, etc.), but the bucket is actually + // determined by the type of object to be uploaded (e.g. CI + // artifact, LFS, etc.) during runtime. In practice, we should only + // need one session per Workhorse process if we only allow one + // configuration for many different buckets. However, using a map + // indexed by the config avoids potential pitfalls in case the + // bucket configuration is supplied at startup or we need to support + // multiple S3 endpoints. + sessions map[config.S3Config]*s3Session + sync.Mutex +} + +func (s *s3Session) isExpired() bool { + return time.Now().After(s.expiry) +} + +func newS3SessionCache() *s3SessionCache { + return &s3SessionCache{sessions: make(map[config.S3Config]*s3Session)} +} + +var ( + // By default, it looks like IAM instance profiles may last 6 hours + // (via curl http://169.254.169.254/latest/meta-data/iam/security-credentials/<role_name>), + // but this may be configurable from anywhere for 15 minutes to 12 + // hours. To be safe, refresh AWS sessions every 10 minutes. + sessionExpiration = time.Duration(10 * time.Minute) + sessionCache = newS3SessionCache() +) + +// SetupS3Session initializes a new AWS S3 session and refreshes one if +// necessary. As recommended in https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html, +// sessions should be cached when possible. Sessions are safe to use +// concurrently as long as the session isn't modified. +func setupS3Session(s3Credentials config.S3Credentials, s3Config config.S3Config) (*session.Session, error) { + sessionCache.Lock() + defer sessionCache.Unlock() + + if s, ok := sessionCache.sessions[s3Config]; ok && !s.isExpired() { + return s.session, nil + } + + cfg := &aws.Config{ + Region: aws.String(s3Config.Region), + S3ForcePathStyle: aws.Bool(s3Config.PathStyle), + } + + // In case IAM profiles aren't being used, use the static credentials + if s3Credentials.AwsAccessKeyID != "" && s3Credentials.AwsSecretAccessKey != "" { + cfg.Credentials = credentials.NewStaticCredentials(s3Credentials.AwsAccessKeyID, s3Credentials.AwsSecretAccessKey, "") + } + + if s3Config.Endpoint != "" { + cfg.Endpoint = aws.String(s3Config.Endpoint) + } + + sess, err := session.NewSession(cfg) + if err != nil { + return nil, err + } + + sessionCache.sessions[s3Config] = &s3Session{ + expiry: time.Now().Add(sessionExpiration), + session: sess, + } + + return sess, nil +} + +func ResetS3Session(s3Config config.S3Config) { + sessionCache.Lock() + defer sessionCache.Unlock() + + delete(sessionCache.sessions, s3Config) +} diff --git a/workhorse/internal/upload/destination/objectstore/s3_session_test.go b/workhorse/internal/upload/destination/objectstore/s3_session_test.go new file mode 100644 index 00000000000..5d57b4f9af8 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/s3_session_test.go @@ -0,0 +1,57 @@ +package objectstore + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" +) + +func TestS3SessionSetup(t *testing.T) { + credentials := config.S3Credentials{} + cfg := config.S3Config{Region: "us-west-1", PathStyle: true} + + sess, err := setupS3Session(credentials, cfg) + require.NoError(t, err) + + require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1") + require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle)) + + require.Equal(t, len(sessionCache.sessions), 1) + anotherConfig := cfg + _, err = setupS3Session(credentials, anotherConfig) + require.NoError(t, err) + require.Equal(t, len(sessionCache.sessions), 1) + + ResetS3Session(cfg) +} + +func TestS3SessionExpiry(t *testing.T) { + credentials := config.S3Credentials{} + cfg := config.S3Config{Region: "us-west-1", PathStyle: true} + + sess, err := setupS3Session(credentials, cfg) + require.NoError(t, err) + + require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1") + require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle)) + + firstSession, ok := sessionCache.sessions[cfg] + require.True(t, ok) + require.False(t, firstSession.isExpired()) + + firstSession.expiry = time.Now().Add(-1 * time.Second) + require.True(t, firstSession.isExpired()) + + _, err = setupS3Session(credentials, cfg) + require.NoError(t, err) + + nextSession, ok := sessionCache.sessions[cfg] + require.True(t, ok) + require.False(t, nextSession.isExpired()) + + ResetS3Session(cfg) +} diff --git a/workhorse/internal/upload/destination/objectstore/test/consts.go b/workhorse/internal/upload/destination/objectstore/test/consts.go new file mode 100644 index 00000000000..7a1bcc28d45 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/test/consts.go @@ -0,0 +1,19 @@ +package test + +// Some useful const for testing purpose +const ( + // ObjectContent an example textual content + ObjectContent = "TEST OBJECT CONTENT" + // ObjectSize is the ObjectContent size + ObjectSize = int64(len(ObjectContent)) + // Objectpath is an example remote object path (including bucket name) + ObjectPath = "/bucket/object" + // ObjectMD5 is ObjectContent MD5 hash + ObjectMD5 = "42d000eea026ee0760677e506189cb33" + // ObjectSHA1 is ObjectContent SHA1 hash + ObjectSHA1 = "173cfd58c6b60cb910f68a26cbb77e3fc5017a6d" + // ObjectSHA256 is ObjectContent SHA256 hash + ObjectSHA256 = "b0257e9e657ef19b15eed4fbba975bd5238d651977564035ef91cb45693647aa" + // ObjectSHA512 is ObjectContent SHA512 hash + ObjectSHA512 = "51af8197db2047f7894652daa7437927bf831d5aa63f1b0b7277c4800b06f5e3057251f0e4c2d344ca8c2daf1ffc08a28dd3b2f5fe0e316d3fd6c3af58c34b97" +) diff --git a/workhorse/internal/upload/destination/objectstore/test/gocloud_stub.go b/workhorse/internal/upload/destination/objectstore/test/gocloud_stub.go new file mode 100644 index 00000000000..cf22075e407 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/test/gocloud_stub.go @@ -0,0 +1,47 @@ +package test + +import ( + "context" + "io/ioutil" + "net/url" + "os" + "testing" + + "github.com/stretchr/testify/require" + "gocloud.dev/blob" + "gocloud.dev/blob/fileblob" +) + +type dirOpener struct { + tmpDir string +} + +func (o *dirOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) { + return fileblob.OpenBucket(o.tmpDir, nil) +} + +func SetupGoCloudFileBucket(t *testing.T, scheme string) (m *blob.URLMux, bucketDir string, cleanup func()) { + tmpDir, err := ioutil.TempDir("", "") + require.NoError(t, err) + + mux := new(blob.URLMux) + fake := &dirOpener{tmpDir: tmpDir} + mux.RegisterBucket(scheme, fake) + cleanup = func() { + os.RemoveAll(tmpDir) + } + + return mux, tmpDir, cleanup +} + +func GoCloudObjectExists(t *testing.T, bucketDir string, objectName string) { + bucket, err := fileblob.OpenBucket(bucketDir, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) // lint:allow context.Background + defer cancel() + + exists, err := bucket.Exists(ctx, objectName) + require.NoError(t, err) + require.True(t, exists) +} diff --git a/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go new file mode 100644 index 00000000000..d51a2de7456 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go @@ -0,0 +1,278 @@ +package test + +import ( + "crypto/md5" + "encoding/hex" + "encoding/xml" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "sync" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore" +) + +type partsEtagMap map[int]string + +// ObjectstoreStub is a testing implementation of ObjectStore. +// Instead of storing objects it will just save md5sum. +type ObjectstoreStub struct { + // bucket contains md5sum of uploaded objects + bucket map[string]string + // overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash + overwriteMD5 map[string]string + // multipart is a map of MultipartUploads + multipart map[string]partsEtagMap + // HTTP header sent along request + headers map[string]*http.Header + + puts int + deletes int + + m sync.Mutex +} + +// StartObjectStore will start an ObjectStore stub +func StartObjectStore() (*ObjectstoreStub, *httptest.Server) { + return StartObjectStoreWithCustomMD5(make(map[string]string)) +} + +// StartObjectStoreWithCustomMD5 will start an ObjectStore stub: md5Hashes contains overwrites for md5sum that should be return on PutObject +func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStub, *httptest.Server) { + os := &ObjectstoreStub{ + bucket: make(map[string]string), + multipart: make(map[string]partsEtagMap), + overwriteMD5: make(map[string]string), + headers: make(map[string]*http.Header), + } + + for k, v := range md5Hashes { + os.overwriteMD5[k] = v + } + + return os, httptest.NewServer(os) +} + +// PutsCnt counts PutObject invocations +func (o *ObjectstoreStub) PutsCnt() int { + o.m.Lock() + defer o.m.Unlock() + + return o.puts +} + +// DeletesCnt counts DeleteObject invocation of a valid object +func (o *ObjectstoreStub) DeletesCnt() int { + o.m.Lock() + defer o.m.Unlock() + + return o.deletes +} + +// GetObjectMD5 return the calculated MD5 of the object uploaded to path +// it will return an empty string if no object has been uploaded on such path +func (o *ObjectstoreStub) GetObjectMD5(path string) string { + o.m.Lock() + defer o.m.Unlock() + + return o.bucket[path] +} + +// GetHeader returns a given HTTP header of the object uploaded to the path +func (o *ObjectstoreStub) GetHeader(path, key string) string { + o.m.Lock() + defer o.m.Unlock() + + if val, ok := o.headers[path]; ok { + return val.Get(key) + } + + return "" +} + +// InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path +// It will return an error if a MultipartUpload is already in progress on that path +// InitiateMultipartUpload is only used during test setup. +// Workhorse's production code does not know how to initiate a multipart upload. +// +// Real S3 multipart uploads are more complicated than what we do here, +// but this is enough to verify that workhorse's production code behaves as intended. +func (o *ObjectstoreStub) InitiateMultipartUpload(path string) error { + o.m.Lock() + defer o.m.Unlock() + + if o.multipart[path] != nil { + return fmt.Errorf("MultipartUpload for %q already in progress", path) + } + + o.multipart[path] = make(partsEtagMap) + return nil +} + +// IsMultipartUpload check if the given path has a MultipartUpload in progress +func (o *ObjectstoreStub) IsMultipartUpload(path string) bool { + o.m.Lock() + defer o.m.Unlock() + + return o.isMultipartUpload(path) +} + +// isMultipartUpload is the lock free version of IsMultipartUpload +func (o *ObjectstoreStub) isMultipartUpload(path string) bool { + return o.multipart[path] != nil +} + +func (o *ObjectstoreStub) removeObject(w http.ResponseWriter, r *http.Request) { + o.m.Lock() + defer o.m.Unlock() + + objectPath := r.URL.Path + if o.isMultipartUpload(objectPath) { + o.deletes++ + delete(o.multipart, objectPath) + + w.WriteHeader(200) + } else if _, ok := o.bucket[objectPath]; ok { + o.deletes++ + delete(o.bucket, objectPath) + + w.WriteHeader(200) + } else { + w.WriteHeader(404) + } +} + +func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) { + o.m.Lock() + defer o.m.Unlock() + + objectPath := r.URL.Path + + etag, overwritten := o.overwriteMD5[objectPath] + if !overwritten { + hasher := md5.New() + io.Copy(hasher, r.Body) + + checksum := hasher.Sum(nil) + etag = hex.EncodeToString(checksum) + } + + o.headers[objectPath] = &r.Header + o.puts++ + if o.isMultipartUpload(objectPath) { + pNumber := r.URL.Query().Get("partNumber") + idx, err := strconv.Atoi(pNumber) + if err != nil { + http.Error(w, fmt.Sprintf("malformed partNumber: %v", err), 400) + return + } + + o.multipart[objectPath][idx] = etag + } else { + o.bucket[objectPath] = etag + } + + w.Header().Set("ETag", etag) + w.WriteHeader(200) +} + +func MultipartUploadInternalError() *objectstore.CompleteMultipartUploadError { + return &objectstore.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"} +} + +func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http.Request) { + o.m.Lock() + defer o.m.Unlock() + + objectPath := r.URL.Path + + multipart := o.multipart[objectPath] + if multipart == nil { + http.Error(w, "Unknown MultipartUpload", 404) + return + } + + buf, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + var msg objectstore.CompleteMultipartUpload + err = xml.Unmarshal(buf, &msg) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + + for _, part := range msg.Part { + etag := multipart[part.PartNumber] + if etag != part.ETag { + msg := fmt.Sprintf("ETag mismatch on part %d. Expected %q got %q", part.PartNumber, etag, part.ETag) + http.Error(w, msg, 400) + return + } + } + + etag, overwritten := o.overwriteMD5[objectPath] + if !overwritten { + etag = "CompleteMultipartUploadETag" + } + + o.bucket[objectPath] = etag + delete(o.multipart, objectPath) + + w.Header().Set("ETag", etag) + split := strings.SplitN(objectPath[1:], "/", 2) + if len(split) < 2 { + encodeXMLAnswer(w, MultipartUploadInternalError()) + return + } + + bucket := split[0] + key := split[1] + answer := objectstore.CompleteMultipartUploadResult{ + Location: r.URL.String(), + Bucket: bucket, + Key: key, + ETag: etag, + } + encodeXMLAnswer(w, answer) +} + +func encodeXMLAnswer(w http.ResponseWriter, answer interface{}) { + w.Header().Set("Content-Type", "text/xml") + + enc := xml.NewEncoder(w) + if err := enc.Encode(answer); err != nil { + http.Error(w, err.Error(), 500) + } +} + +func (o *ObjectstoreStub) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Body != nil { + defer r.Body.Close() + } + + fmt.Println("ObjectStore Stub:", r.Method, r.URL.String()) + + if r.URL.Path == "" { + http.Error(w, "No path provided", 404) + return + } + + switch r.Method { + case "DELETE": + o.removeObject(w, r) + case "PUT": + o.putObject(w, r) + case "POST": + o.completeMultipartUpload(w, r) + default: + w.WriteHeader(404) + } +} diff --git a/workhorse/internal/upload/destination/objectstore/test/objectstore_stub_test.go b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub_test.go new file mode 100644 index 00000000000..8c0d52a2d79 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub_test.go @@ -0,0 +1,167 @@ +package test + +import ( + "fmt" + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func doRequest(method, url string, body io.Reader) error { + req, err := http.NewRequest(method, url, body) + if err != nil { + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + return resp.Body.Close() +} + +func TestObjectStoreStub(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + require.Equal(t, 0, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + + objectURL := ts.URL + ObjectPath + + require.NoError(t, doRequest(http.MethodPut, objectURL, strings.NewReader(ObjectContent))) + + require.Equal(t, 1, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + require.Equal(t, ObjectMD5, stub.GetObjectMD5(ObjectPath)) + + require.NoError(t, doRequest(http.MethodDelete, objectURL, nil)) + + require.Equal(t, 1, stub.PutsCnt()) + require.Equal(t, 1, stub.DeletesCnt()) +} + +func TestObjectStoreStubDelete404(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + objectURL := ts.URL + ObjectPath + + req, err := http.NewRequest(http.MethodDelete, objectURL, nil) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, 404, resp.StatusCode) + + require.Equal(t, 0, stub.DeletesCnt()) +} + +func TestObjectStoreInitiateMultipartUpload(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + path := "/my-multipart" + err := stub.InitiateMultipartUpload(path) + require.NoError(t, err) + + err = stub.InitiateMultipartUpload(path) + require.Error(t, err, "second attempt to open the same MultipartUpload") +} + +func TestObjectStoreCompleteMultipartUpload(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + objectURL := ts.URL + ObjectPath + parts := []struct { + number int + content string + contentMD5 string + }{ + { + number: 1, + content: "first part", + contentMD5: "550cf6b6e60f65a0e3104a26e70fea42", + }, { + number: 2, + content: "second part", + contentMD5: "920b914bca0a70780b40881b8f376135", + }, + } + + stub.InitiateMultipartUpload(ObjectPath) + + require.True(t, stub.IsMultipartUpload(ObjectPath)) + require.Equal(t, 0, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + + // Workhorse knows nothing about S3 MultipartUpload, it receives some URLs + // from GitLab-rails and PUTs chunk of data to each of them. + // Then it completes the upload with a final POST + partPutURLs := []string{ + fmt.Sprintf("%s?partNumber=%d", objectURL, 1), + fmt.Sprintf("%s?partNumber=%d", objectURL, 2), + } + completePostURL := objectURL + + for i, partPutURL := range partPutURLs { + part := parts[i] + + require.NoError(t, doRequest(http.MethodPut, partPutURL, strings.NewReader(part.content))) + + require.Equal(t, i+1, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + require.Equal(t, part.contentMD5, stub.multipart[ObjectPath][part.number], "Part %d was not uploaded into ObjectStorage", part.number) + require.Empty(t, stub.GetObjectMD5(ObjectPath), "Part %d was mistakenly uploaded as a single object", part.number) + require.True(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted") + } + + completeBody := fmt.Sprintf(`<CompleteMultipartUpload> + <Part> + <PartNumber>1</PartNumber> + <ETag>%s</ETag> + </Part> + <Part> + <PartNumber>2</PartNumber> + <ETag>%s</ETag> + </Part> + </CompleteMultipartUpload>`, parts[0].contentMD5, parts[1].contentMD5) + require.NoError(t, doRequest(http.MethodPost, completePostURL, strings.NewReader(completeBody))) + + require.Equal(t, len(parts), stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + require.False(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload is still in progress") +} + +func TestObjectStoreAbortMultipartUpload(t *testing.T) { + stub, ts := StartObjectStore() + defer ts.Close() + + stub.InitiateMultipartUpload(ObjectPath) + + require.True(t, stub.IsMultipartUpload(ObjectPath)) + require.Equal(t, 0, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + + objectURL := ts.URL + ObjectPath + require.NoError(t, doRequest(http.MethodPut, fmt.Sprintf("%s?partNumber=%d", objectURL, 1), strings.NewReader(ObjectContent))) + + require.Equal(t, 1, stub.PutsCnt()) + require.Equal(t, 0, stub.DeletesCnt()) + require.Equal(t, ObjectMD5, stub.multipart[ObjectPath][1], "Part was not uploaded into ObjectStorage") + require.Empty(t, stub.GetObjectMD5(ObjectPath), "Part was mistakenly uploaded as a single object") + require.True(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted") + + require.NoError(t, doRequest(http.MethodDelete, objectURL, nil)) + + require.Equal(t, 1, stub.PutsCnt()) + require.Equal(t, 1, stub.DeletesCnt()) + require.Empty(t, stub.GetObjectMD5(ObjectPath), "MultiUpload has been completed") + require.False(t, stub.IsMultipartUpload(ObjectPath), "MultiUpload is still in progress") +} diff --git a/workhorse/internal/upload/destination/objectstore/test/s3_stub.go b/workhorse/internal/upload/destination/objectstore/test/s3_stub.go new file mode 100644 index 00000000000..6b83426b852 --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/test/s3_stub.go @@ -0,0 +1,142 @@ +package test + +import ( + "io/ioutil" + "net/http/httptest" + "os" + "strings" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + + "github.com/johannesboyne/gofakes3" + "github.com/johannesboyne/gofakes3/backend/s3mem" +) + +func SetupS3(t *testing.T, encryption string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) { + return SetupS3WithBucket(t, "test-bucket", encryption) +} + +func SetupS3WithBucket(t *testing.T, bucket string, encryption string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) { + backend := s3mem.New() + faker := gofakes3.New(backend) + ts := httptest.NewServer(faker.Server()) + + creds := config.S3Credentials{ + AwsAccessKeyID: "YOUR-ACCESSKEYID", + AwsSecretAccessKey: "YOUR-SECRETACCESSKEY", + } + + config := config.S3Config{ + Bucket: bucket, + Endpoint: ts.URL, + Region: "eu-central-1", + PathStyle: true, + } + + if encryption != "" { + config.ServerSideEncryption = encryption + + if encryption == s3.ServerSideEncryptionAwsKms { + config.SSEKMSKeyID = "arn:aws:1234" + } + } + + sess, err := session.NewSession(&aws.Config{ + Credentials: credentials.NewStaticCredentials(creds.AwsAccessKeyID, creds.AwsSecretAccessKey, ""), + Endpoint: aws.String(ts.URL), + Region: aws.String(config.Region), + DisableSSL: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + }) + require.NoError(t, err) + + // Create S3 service client + svc := s3.New(sess) + + _, err = svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + require.NoError(t, err) + + return creds, config, sess, ts +} + +// S3ObjectExists will fail the test if the file does not exist. +func S3ObjectExists(t *testing.T, sess *session.Session, config config.S3Config, objectName string, expectedBytes string) { + downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) { + require.NoError(t, err) + require.Equal(t, int64(len(expectedBytes)), numBytes) + + output, err := ioutil.ReadFile(tmpfile.Name()) + require.NoError(t, err) + + require.Equal(t, []byte(expectedBytes), output) + }) +} + +func CheckS3Metadata(t *testing.T, sess *session.Session, config config.S3Config, objectName string) { + // In a real S3 provider, s3crypto.NewDecryptionClient should probably be used + svc := s3.New(sess) + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(config.Bucket), + Key: aws.String(objectName), + }) + require.NoError(t, err) + + if config.ServerSideEncryption != "" { + require.Equal(t, aws.String(config.ServerSideEncryption), result.ServerSideEncryption) + + if config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms { + require.Equal(t, aws.String(config.SSEKMSKeyID), result.SSEKMSKeyId) + } else { + require.Nil(t, result.SSEKMSKeyId) + } + } else { + require.Nil(t, result.ServerSideEncryption) + require.Nil(t, result.SSEKMSKeyId) + } +} + +// S3ObjectDoesNotExist returns true if the object has been deleted, +// false otherwise. The return signature is different from +// S3ObjectExists because deletion may need to be retried since deferred +// clean up callsinternal/objectstore/test/s3_stub.go may cause the actual deletion to happen after the +// initial check. +func S3ObjectDoesNotExist(t *testing.T, sess *session.Session, config config.S3Config, objectName string) bool { + deleted := false + + downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) { + if err != nil && strings.Contains(err.Error(), "NoSuchKey") { + deleted = true + } + }) + + return deleted +} + +func downloadObject(t *testing.T, sess *session.Session, config config.S3Config, objectName string, handler func(tmpfile *os.File, numBytes int64, err error)) { + tmpDir, err := ioutil.TempDir("", "workhorse-test-") + require.NoError(t, err) + defer os.Remove(tmpDir) + + tmpfile, err := ioutil.TempFile(tmpDir, "s3-output") + require.NoError(t, err) + defer os.Remove(tmpfile.Name()) + + downloadSvc := s3manager.NewDownloader(sess) + numBytes, err := downloadSvc.Download(tmpfile, &s3.GetObjectInput{ + Bucket: aws.String(config.Bucket), + Key: aws.String(objectName), + }) + + handler(tmpfile, numBytes, err) +} diff --git a/workhorse/internal/upload/destination/objectstore/upload_strategy.go b/workhorse/internal/upload/destination/objectstore/upload_strategy.go new file mode 100644 index 00000000000..5707ba5f24e --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/upload_strategy.go @@ -0,0 +1,46 @@ +package objectstore + +import ( + "context" + "io" + "net/http" + + "gitlab.com/gitlab-org/labkit/log" + "gitlab.com/gitlab-org/labkit/mask" +) + +type uploadStrategy interface { + Upload(ctx context.Context, r io.Reader) error + ETag() string + Abort() + Delete() +} + +func deleteURL(url string) { + if url == "" { + return + } + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") + return + } + // TODO: consider adding the context to the outgoing request for better instrumentation + + // here we are not using u.ctx because we must perform cleanup regardless of parent context + resp, err := httpClient.Do(req) + if err != nil { + log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") + return + } + resp.Body.Close() +} + +func extractETag(rawETag string) string { + if rawETag != "" && rawETag[0] == '"' { + rawETag = rawETag[1 : len(rawETag)-1] + } + + return rawETag +} diff --git a/workhorse/internal/upload/destination/objectstore/uploader.go b/workhorse/internal/upload/destination/objectstore/uploader.go new file mode 100644 index 00000000000..aedfbe55ead --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/uploader.go @@ -0,0 +1,115 @@ +package objectstore + +import ( + "context" + "crypto/md5" + "encoding/hex" + "fmt" + "hash" + "io" + "strings" + "time" + + "gitlab.com/gitlab-org/labkit/log" +) + +// uploader consumes an io.Reader and uploads it using a pluggable uploadStrategy. +type uploader struct { + strategy uploadStrategy + + // In the case of S3 uploads, we have a multipart upload which + // instantiates uploads for the individual parts. We don't want to + // increment metrics for the individual parts, so that is why we have + // this boolean flag. + metrics bool + + // With S3 we compare the MD5 of the data we sent with the ETag returned + // by the object storage server. + checkETag bool +} + +func newUploader(strategy uploadStrategy) *uploader { + return &uploader{strategy: strategy, metrics: true} +} + +func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader { + return &uploader{strategy: strategy, metrics: metrics, checkETag: true} +} + +func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) } + +// Consume reads the reader until it reaches EOF or an error. It spawns a +// goroutine that waits for outerCtx to be done, after which the remote +// file is deleted. The deadline applies to the upload performed inside +// Consume, not to outerCtx. +func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) { + if u.metrics { + objectStorageUploadsOpen.Inc() + defer func(started time.Time) { + objectStorageUploadsOpen.Dec() + objectStorageUploadTime.Observe(time.Since(started).Seconds()) + if err != nil { + objectStorageUploadRequestsRequestFailed.Inc() + } + }(time.Now()) + } + + defer func() { + // We do this mainly to abort S3 multipart uploads: it is not enough to + // "delete" them. + if err != nil { + u.strategy.Abort() + } + }() + + go func() { + // Once gitlab-rails is done handling the request, we are supposed to + // delete the upload from its temporary location. + <-outerCtx.Done() + u.strategy.Delete() + }() + + uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline) + defer cancelFn() + + var hasher hash.Hash + if u.checkETag { + hasher = md5.New() + reader = io.TeeReader(reader, hasher) + } + + cr := &countReader{r: reader} + if err := u.strategy.Upload(uploadCtx, cr); err != nil { + return cr.n, err + } + + if u.checkETag { + if err := compareMD5(hexString(hasher), u.strategy.ETag()); err != nil { + log.ContextLogger(uploadCtx).WithError(err).Error("error comparing MD5 checksum") + return cr.n, err + } + } + + objectStorageUploadBytes.Add(float64(cr.n)) + + return cr.n, nil +} + +func compareMD5(local, remote string) error { + if !strings.EqualFold(local, remote) { + return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote) + } + + return nil +} + +type countReader struct { + r io.Reader + n int64 +} + +func (cr *countReader) Read(p []byte) (int, error) { + nRead, err := cr.r.Read(p) + cr.n += int64(nRead) + return nRead, err +} diff --git a/workhorse/internal/upload/destination/reader.go b/workhorse/internal/upload/destination/reader.go new file mode 100644 index 00000000000..925a9468e14 --- /dev/null +++ b/workhorse/internal/upload/destination/reader.go @@ -0,0 +1,17 @@ +package destination + +import "io" + +type hardLimitReader struct { + r io.Reader + n int64 +} + +func (h *hardLimitReader) Read(p []byte) (int, error) { + nRead, err := h.r.Read(p) + h.n -= int64(nRead) + if h.n < 0 { + err = ErrEntityTooLarge + } + return nRead, err +} diff --git a/workhorse/internal/upload/destination/reader_test.go b/workhorse/internal/upload/destination/reader_test.go new file mode 100644 index 00000000000..a26f7746a13 --- /dev/null +++ b/workhorse/internal/upload/destination/reader_test.go @@ -0,0 +1,46 @@ +package destination + +import ( + "fmt" + "io/ioutil" + "strings" + "testing" + "testing/iotest" + + "github.com/stretchr/testify/require" +) + +func TestHardLimitReader(t *testing.T) { + const text = "hello world" + r := iotest.OneByteReader( + &hardLimitReader{ + r: strings.NewReader(text), + n: int64(len(text)), + }, + ) + + out, err := ioutil.ReadAll(r) + require.NoError(t, err) + require.Equal(t, text, string(out)) +} + +func TestHardLimitReaderFail(t *testing.T) { + const text = "hello world" + + for bufSize := len(text) / 2; bufSize < len(text)*2; bufSize++ { + t.Run(fmt.Sprintf("bufsize:%d", bufSize), func(t *testing.T) { + r := &hardLimitReader{ + r: iotest.DataErrReader(strings.NewReader(text)), + n: int64(len(text)) - 1, + } + buf := make([]byte, bufSize) + + var err error + for i := 0; err == nil && i < 1000; i++ { + _, err = r.Read(buf) + } + + require.Equal(t, ErrEntityTooLarge, err) + }) + } +} diff --git a/workhorse/internal/upload/destination/upload_opts.go b/workhorse/internal/upload/destination/upload_opts.go new file mode 100644 index 00000000000..750a79d7bc2 --- /dev/null +++ b/workhorse/internal/upload/destination/upload_opts.go @@ -0,0 +1,171 @@ +package destination + +import ( + "errors" + "strings" + "time" + + "gocloud.dev/blob" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" +) + +// DefaultObjectStoreTimeout is the timeout for ObjectStore upload operation +const DefaultObjectStoreTimeout = 4 * time.Hour + +type ObjectStorageConfig struct { + Provider string + + S3Credentials config.S3Credentials + S3Config config.S3Config + + // GoCloud mux that maps azureblob:// and future URLs (e.g. s3://, gcs://, etc.) to a handler + URLMux *blob.URLMux + + // Azure credentials are registered at startup in the GoCloud URLMux, so only the container name is needed + GoCloudConfig config.GoCloudConfig +} + +// UploadOpts represents all the options available for saving a file to object store +type UploadOpts struct { + // TempFilePrefix is the prefix used to create temporary local file + TempFilePrefix string + // LocalTempPath is the directory where to write a local copy of the file + LocalTempPath string + // RemoteID is the remote ObjectID provided by GitLab + RemoteID string + // RemoteURL is the final URL of the file + RemoteURL string + // PresignedPut is a presigned S3 PutObject compatible URL + PresignedPut string + // PresignedDelete is a presigned S3 DeleteObject compatible URL. + PresignedDelete string + // HTTP headers to be sent along with PUT request + PutHeaders map[string]string + // Whether to ignore Rails pre-signed URLs and have Workhorse directly access object storage provider + UseWorkhorseClient bool + // If UseWorkhorseClient is true, this is the temporary object name to store the file + RemoteTempObjectID string + // Workhorse object storage client (e.g. S3) parameters + ObjectStorageConfig ObjectStorageConfig + // Deadline it the S3 operation deadline, the upload will be aborted if not completed in time + Deadline time.Time + // The maximum accepted size in bytes of the upload + MaximumSize int64 + + //MultipartUpload parameters + // PartSize is the exact size of each uploaded part. Only the last one can be smaller + PartSize int64 + // PresignedParts contains the presigned URLs for each part + PresignedParts []string + // PresignedCompleteMultipart is a presigned URL for CompleteMulipartUpload + PresignedCompleteMultipart string + // PresignedAbortMultipart is a presigned URL for AbortMultipartUpload + PresignedAbortMultipart string +} + +// UseWorkhorseClientEnabled checks if the options require direct access to object storage +func (s *UploadOpts) UseWorkhorseClientEnabled() bool { + return s.UseWorkhorseClient && s.ObjectStorageConfig.IsValid() && s.RemoteTempObjectID != "" +} + +// IsLocal checks if the options require the writing of the file on disk +func (s *UploadOpts) IsLocal() bool { + return s.LocalTempPath != "" +} + +// IsMultipart checks if the options requires a Multipart upload +func (s *UploadOpts) IsMultipart() bool { + return s.PartSize > 0 +} + +// GetOpts converts GitLab api.Response to a proper UploadOpts +func GetOpts(apiResponse *api.Response) (*UploadOpts, error) { + timeout := time.Duration(apiResponse.RemoteObject.Timeout) * time.Second + if timeout == 0 { + timeout = DefaultObjectStoreTimeout + } + + opts := UploadOpts{ + LocalTempPath: apiResponse.TempPath, + RemoteID: apiResponse.RemoteObject.ID, + RemoteURL: apiResponse.RemoteObject.GetURL, + PresignedPut: apiResponse.RemoteObject.StoreURL, + PresignedDelete: apiResponse.RemoteObject.DeleteURL, + PutHeaders: apiResponse.RemoteObject.PutHeaders, + UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient, + RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID, + Deadline: time.Now().Add(timeout), + MaximumSize: apiResponse.MaximumSize, + } + + if opts.LocalTempPath != "" && opts.RemoteID != "" { + return nil, errors.New("API response has both TempPath and RemoteObject") + } + + if opts.LocalTempPath == "" && opts.RemoteID == "" { + return nil, errors.New("API response has neither TempPath nor RemoteObject") + } + + objectStorageParams := apiResponse.RemoteObject.ObjectStorage + if opts.UseWorkhorseClient && objectStorageParams != nil { + opts.ObjectStorageConfig.Provider = objectStorageParams.Provider + opts.ObjectStorageConfig.S3Config = objectStorageParams.S3Config + opts.ObjectStorageConfig.GoCloudConfig = objectStorageParams.GoCloudConfig + } + + // Backwards compatibility to ensure API servers that do not include the + // CustomPutHeaders flag will default to the original content type. + if !apiResponse.RemoteObject.CustomPutHeaders { + opts.PutHeaders = make(map[string]string) + opts.PutHeaders["Content-Type"] = "application/octet-stream" + } + + if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil { + opts.PartSize = multiParams.PartSize + opts.PresignedCompleteMultipart = multiParams.CompleteURL + opts.PresignedAbortMultipart = multiParams.AbortURL + opts.PresignedParts = append([]string(nil), multiParams.PartURLs...) + } + + return &opts, nil +} + +func (c *ObjectStorageConfig) IsAWS() bool { + return strings.EqualFold(c.Provider, "AWS") || strings.EqualFold(c.Provider, "S3") +} + +func (c *ObjectStorageConfig) IsAzure() bool { + return strings.EqualFold(c.Provider, "AzureRM") +} + +func (c *ObjectStorageConfig) IsGoCloud() bool { + return c.GoCloudConfig.URL != "" +} + +func (c *ObjectStorageConfig) IsValid() bool { + if c.IsAWS() { + return c.S3Config.Bucket != "" && c.s3CredentialsValid() + } else if c.IsGoCloud() { + // We could parse and validate the URL, but GoCloud providers + // such as AzureRM don't have a fallback to normal HTTP, so we + // always want to try the GoCloud path if there is a URL. + return true + } + + return false +} + +func (c *ObjectStorageConfig) s3CredentialsValid() bool { + // We need to be able to distinguish between two cases of AWS access: + // 1. AWS access via key and secret, but credentials not configured in Workhorse + // 2. IAM instance profiles used + if c.S3Config.UseIamProfile { + return true + } else if c.S3Credentials.AwsAccessKeyID != "" && c.S3Credentials.AwsSecretAccessKey != "" { + return true + } + + return false +} diff --git a/workhorse/internal/upload/destination/upload_opts_test.go b/workhorse/internal/upload/destination/upload_opts_test.go new file mode 100644 index 00000000000..fde726c985d --- /dev/null +++ b/workhorse/internal/upload/destination/upload_opts_test.go @@ -0,0 +1,342 @@ +package destination_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" +) + +func TestUploadOptsLocalAndRemote(t *testing.T) { + tests := []struct { + name string + localTempPath string + presignedPut string + partSize int64 + isLocal bool + isRemote bool + isMultipart bool + }{ + { + name: "Only LocalTempPath", + localTempPath: "/tmp", + isLocal: true, + }, + { + name: "No paths", + }, + { + name: "Only remoteUrl", + presignedPut: "http://example.com", + }, + { + name: "Multipart", + partSize: 10, + isMultipart: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + opts := destination.UploadOpts{ + LocalTempPath: test.localTempPath, + PresignedPut: test.presignedPut, + PartSize: test.partSize, + } + + require.Equal(t, test.isLocal, opts.IsLocal(), "IsLocal() mismatch") + require.Equal(t, test.isMultipart, opts.IsMultipart(), "IsMultipart() mismatch") + }) + } +} + +func TestGetOpts(t *testing.T) { + tests := []struct { + name string + multipart *api.MultipartUploadParams + customPutHeaders bool + putHeaders map[string]string + }{ + { + name: "Single upload", + }, + { + name: "Multipart upload", + multipart: &api.MultipartUploadParams{ + PartSize: 10, + CompleteURL: "http://complete", + AbortURL: "http://abort", + PartURLs: []string{"http://part1", "http://part2"}, + }, + }, + { + name: "Single upload with custom content type", + customPutHeaders: true, + putHeaders: map[string]string{"Content-Type": "image/jpeg"}, + }, { + name: "Multipart upload with custom content type", + multipart: &api.MultipartUploadParams{ + PartSize: 10, + CompleteURL: "http://complete", + AbortURL: "http://abort", + PartURLs: []string{"http://part1", "http://part2"}, + }, + customPutHeaders: true, + putHeaders: map[string]string{"Content-Type": "image/jpeg"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + apiResponse := &api.Response{ + RemoteObject: api.RemoteObject{ + Timeout: 10, + ID: "id", + GetURL: "http://get", + StoreURL: "http://store", + DeleteURL: "http://delete", + MultipartUpload: test.multipart, + CustomPutHeaders: test.customPutHeaders, + PutHeaders: test.putHeaders, + }, + } + deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) + opts, err := destination.GetOpts(apiResponse) + require.NoError(t, err) + + require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) + require.WithinDuration(t, deadline, opts.Deadline, time.Second) + require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) + require.Equal(t, apiResponse.RemoteObject.GetURL, opts.RemoteURL) + require.Equal(t, apiResponse.RemoteObject.StoreURL, opts.PresignedPut) + require.Equal(t, apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete) + if test.customPutHeaders { + require.Equal(t, opts.PutHeaders, apiResponse.RemoteObject.PutHeaders) + } else { + require.Equal(t, opts.PutHeaders, map[string]string{"Content-Type": "application/octet-stream"}) + } + + if test.multipart == nil { + require.False(t, opts.IsMultipart()) + require.Empty(t, opts.PresignedCompleteMultipart) + require.Empty(t, opts.PresignedAbortMultipart) + require.Zero(t, opts.PartSize) + require.Empty(t, opts.PresignedParts) + } else { + require.True(t, opts.IsMultipart()) + require.Equal(t, test.multipart.CompleteURL, opts.PresignedCompleteMultipart) + require.Equal(t, test.multipart.AbortURL, opts.PresignedAbortMultipart) + require.Equal(t, test.multipart.PartSize, opts.PartSize) + require.Equal(t, test.multipart.PartURLs, opts.PresignedParts) + } + }) + } +} + +func TestGetOptsFail(t *testing.T) { + testCases := []struct { + desc string + in *api.Response + }{ + { + desc: "neither local nor remote", + in: &api.Response{}, + }, + { + desc: "both local and remote", + in: &api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + _, err := destination.GetOpts(tc.in) + require.Error(t, err, "expect input to be rejected") + }) + } +} + +func TestGetOptsDefaultTimeout(t *testing.T) { + deadline := time.Now().Add(destination.DefaultObjectStoreTimeout) + opts, err := destination.GetOpts(&api.Response{TempPath: "/foo/bar"}) + require.NoError(t, err) + + require.WithinDuration(t, deadline, opts.Deadline, time.Minute) +} + +func TestUseWorkhorseClientEnabled(t *testing.T) { + cfg := destination.ObjectStorageConfig{ + Provider: "AWS", + S3Config: config.S3Config{ + Bucket: "test-bucket", + Region: "test-region", + }, + S3Credentials: config.S3Credentials{ + AwsAccessKeyID: "test-key", + AwsSecretAccessKey: "test-secret", + }, + } + + missingCfg := cfg + missingCfg.S3Credentials = config.S3Credentials{} + + iamConfig := missingCfg + iamConfig.S3Config.UseIamProfile = true + + missingRegion := cfg + missingRegion.S3Config.Region = "" + + tests := []struct { + name string + UseWorkhorseClient bool + remoteTempObjectID string + objectStorageConfig destination.ObjectStorageConfig + expected bool + }{ + { + name: "all direct access settings used", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: cfg, + expected: true, + }, + { + name: "missing AWS credentials", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: missingCfg, + expected: false, + }, + { + name: "direct access disabled", + UseWorkhorseClient: false, + remoteTempObjectID: "test-object", + objectStorageConfig: cfg, + expected: false, + }, + { + name: "with IAM instance profile", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: iamConfig, + expected: true, + }, + { + name: "missing remote temp object ID", + UseWorkhorseClient: true, + remoteTempObjectID: "", + objectStorageConfig: cfg, + expected: false, + }, + { + name: "missing S3 config", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + expected: false, + }, + { + name: "missing S3 bucket", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: destination.ObjectStorageConfig{ + Provider: "AWS", + S3Config: config.S3Config{}, + }, + expected: false, + }, + { + name: "missing S3 region", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: missingRegion, + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + apiResponse := &api.Response{ + RemoteObject: api.RemoteObject{ + Timeout: 10, + ID: "id", + UseWorkhorseClient: test.UseWorkhorseClient, + RemoteTempObjectID: test.remoteTempObjectID, + }, + } + deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) + opts, err := destination.GetOpts(apiResponse) + require.NoError(t, err) + opts.ObjectStorageConfig = test.objectStorageConfig + + require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) + require.WithinDuration(t, deadline, opts.Deadline, time.Second) + require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) + require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient) + require.Equal(t, test.expected, opts.UseWorkhorseClientEnabled()) + }) + } +} + +func TestGoCloudConfig(t *testing.T) { + mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azblob") + defer cleanup() + + tests := []struct { + name string + provider string + url string + valid bool + }{ + { + name: "valid AzureRM config", + provider: "AzureRM", + url: "azblob:://test-container", + valid: true, + }, + { + name: "invalid GoCloud scheme", + provider: "AzureRM", + url: "unknown:://test-container", + valid: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + apiResponse := &api.Response{ + RemoteObject: api.RemoteObject{ + Timeout: 10, + ID: "id", + UseWorkhorseClient: true, + RemoteTempObjectID: "test-object", + ObjectStorage: &api.ObjectStorageParams{ + Provider: test.provider, + GoCloudConfig: config.GoCloudConfig{ + URL: test.url, + }, + }, + }, + } + deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) + opts, err := destination.GetOpts(apiResponse) + require.NoError(t, err) + opts.ObjectStorageConfig.URLMux = mux + + require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) + require.Equal(t, apiResponse.RemoteObject.RemoteTempObjectID, opts.RemoteTempObjectID) + require.WithinDuration(t, deadline, opts.Deadline, time.Second) + require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) + require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient) + require.Equal(t, test.provider, opts.ObjectStorageConfig.Provider) + require.Equal(t, apiResponse.RemoteObject.ObjectStorage.GoCloudConfig, opts.ObjectStorageConfig.GoCloudConfig) + require.True(t, opts.UseWorkhorseClientEnabled()) + require.Equal(t, test.valid, opts.ObjectStorageConfig.IsValid()) + require.False(t, opts.IsLocal()) + }) + } +} diff --git a/workhorse/internal/upload/lfs_preparer.go b/workhorse/internal/upload/lfs_preparer.go new file mode 100644 index 00000000000..e7c5cf16a30 --- /dev/null +++ b/workhorse/internal/upload/lfs_preparer.go @@ -0,0 +1,47 @@ +package upload + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" +) + +type object struct { + size int64 + oid string +} + +func (l *object) Verify(fh *destination.FileHandler) error { + if fh.Size != l.size { + return fmt.Errorf("LFSObject: expected size %d, wrote %d", l.size, fh.Size) + } + + if fh.SHA256() != l.oid { + return fmt.Errorf("LFSObject: expected sha256 %s, got %s", l.oid, fh.SHA256()) + } + + return nil +} + +type uploadPreparer struct { + objectPreparer Preparer +} + +// NewLfs returns a new preparer instance which adds capability to a wrapped +// preparer to set options required for a LFS upload. +func NewLfsPreparer(c config.Config, objectPreparer Preparer) Preparer { + return &uploadPreparer{objectPreparer: objectPreparer} +} + +func (l *uploadPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) { + opts, _, err := l.objectPreparer.Prepare(a) + if err != nil { + return nil, nil, err + } + + opts.TempFilePrefix = a.LfsOid + + return opts, &object{oid: a.LfsOid, size: a.LfsSize}, nil +} diff --git a/workhorse/internal/upload/lfs_preparer_test.go b/workhorse/internal/upload/lfs_preparer_test.go new file mode 100644 index 00000000000..6be4a7c2955 --- /dev/null +++ b/workhorse/internal/upload/lfs_preparer_test.go @@ -0,0 +1,59 @@ +package upload + +import ( + "testing" + + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + + "github.com/stretchr/testify/require" +) + +func TestLfsPreparerWithConfig(t *testing.T) { + lfsOid := "abcd1234" + creds := config.S3Credentials{ + AwsAccessKeyID: "test-key", + AwsSecretAccessKey: "test-secret", + } + + c := config.Config{ + ObjectStorageCredentials: config.ObjectStorageCredentials{ + Provider: "AWS", + S3Credentials: creds, + }, + } + + r := &api.Response{ + LfsOid: lfsOid, + RemoteObject: api.RemoteObject{ + ID: "the upload ID", + UseWorkhorseClient: true, + ObjectStorage: &api.ObjectStorageParams{ + Provider: "AWS", + }, + }, + } + + uploadPreparer := NewObjectStoragePreparer(c) + lfsPreparer := NewLfsPreparer(c, uploadPreparer) + opts, verifier, err := lfsPreparer.Prepare(r) + + require.NoError(t, err) + require.Equal(t, lfsOid, opts.TempFilePrefix) + require.True(t, opts.ObjectStorageConfig.IsAWS()) + require.True(t, opts.UseWorkhorseClient) + require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials) + require.NotNil(t, verifier) +} + +func TestLfsPreparerWithNoConfig(t *testing.T) { + c := config.Config{} + r := &api.Response{RemoteObject: api.RemoteObject{ID: "the upload ID"}} + uploadPreparer := NewObjectStoragePreparer(c) + lfsPreparer := NewLfsPreparer(c, uploadPreparer) + opts, verifier, err := lfsPreparer.Prepare(r) + + require.NoError(t, err) + require.False(t, opts.UseWorkhorseClient) + require.NotNil(t, verifier) +} diff --git a/workhorse/internal/upload/accelerate.go b/workhorse/internal/upload/multipart_uploader.go index 28d3b3dee2e..d0097f9e153 100644 --- a/workhorse/internal/upload/accelerate.go +++ b/workhorse/internal/upload/multipart_uploader.go @@ -4,19 +4,10 @@ import ( "fmt" "net/http" - "github.com/golang-jwt/jwt/v4" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" ) -const RewrittenFieldsHeader = "Gitlab-Workhorse-Multipart-Fields" - -type MultipartClaims struct { - RewrittenFields map[string]string `json:"rewritten_fields"` - jwt.StandardClaims -} - // Multipart is a request middleware. If the request has a MIME multipart // request body, the middleware will iterate through the multipart parts. // When it finds a file part (filename != ""), the middleware will save @@ -32,6 +23,6 @@ func Multipart(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler { return } - InterceptMultipartFiles(w, r, h, a, s, opts) + interceptMultipartFiles(w, r, h, a, s, opts) }, "/authorize") } diff --git a/workhorse/internal/upload/object_storage_preparer.go b/workhorse/internal/upload/object_storage_preparer.go index 241cfc2d9d2..f28f598c895 100644 --- a/workhorse/internal/upload/object_storage_preparer.go +++ b/workhorse/internal/upload/object_storage_preparer.go @@ -3,7 +3,7 @@ package upload import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" ) type ObjectStoragePreparer struct { @@ -11,12 +11,15 @@ type ObjectStoragePreparer struct { credentials config.ObjectStorageCredentials } +// NewObjectStoragePreparer returns a new preparer instance which is responsible for +// setting the object storage credentials and settings needed by an uploader +// to upload to object storage. func NewObjectStoragePreparer(c config.Config) Preparer { return &ObjectStoragePreparer{credentials: c.ObjectStorageCredentials, config: c.ObjectStorageConfig} } -func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) { - opts, err := filestore.GetOpts(a) +func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) { + opts, err := destination.GetOpts(a) if err != nil { return nil, nil, err } diff --git a/workhorse/internal/upload/preparer.go b/workhorse/internal/upload/preparer.go new file mode 100644 index 00000000000..46a4cac01b5 --- /dev/null +++ b/workhorse/internal/upload/preparer.go @@ -0,0 +1,33 @@ +package upload + +import ( + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" +) + +// Verifier is an optional pluggable behavior for upload paths. If +// Verify() returns an error, Workhorse will return an error response to +// the client instead of propagating the request to Rails. The motivating +// use case is Git LFS, where Workhorse checks the size and SHA256 +// checksum of the uploaded file. +type Verifier interface { + // Verify can abort the upload by returning an error + Verify(handler *destination.FileHandler) error +} + +// Preparer is a pluggable behavior that interprets a Rails API response +// and either tells Workhorse how to handle the upload, via the +// UploadOpts and Verifier, or it rejects the request by returning a +// non-nil error. Its intended use is to make sure the upload gets stored +// in the right location: either a local directory, or one of several +// supported object storage backends. +type Preparer interface { + Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) +} + +type DefaultPreparer struct{} + +func (s *DefaultPreparer) Prepare(a *api.Response) (*destination.UploadOpts, Verifier, error) { + opts, err := destination.GetOpts(a) + return opts, nil, err +} diff --git a/workhorse/internal/upload/rewrite.go b/workhorse/internal/upload/rewrite.go index bbabe840ef5..ff5190226af 100644 --- a/workhorse/internal/upload/rewrite.go +++ b/workhorse/internal/upload/rewrite.go @@ -21,8 +21,8 @@ import ( "golang.org/x/image/tiff" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/lsif_transformer/parser" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif" ) @@ -68,7 +68,7 @@ type rewriter struct { finalizedFields map[string]bool } -func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) error { +func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) error { // Create multipart reader reader, err := r.MultipartReader() if err != nil { @@ -128,7 +128,7 @@ func parseAndNormalizeContentDisposition(header textproto.MIMEHeader) (string, s return params["name"], params["filename"] } -func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *filestore.SaveFileOpts) error { +func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *destination.UploadOpts) error { if rew.filter.Count() >= maxFilesAllowed { return ErrTooManyFilesUploaded } @@ -164,10 +164,10 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa defer inputReader.Close() - fh, err := filestore.SaveFileFromReader(ctx, inputReader, -1, opts) + fh, err := destination.Upload(ctx, inputReader, -1, opts) if err != nil { switch err { - case filestore.ErrEntityTooLarge, exif.ErrRemovingExif: + case destination.ErrEntityTooLarge, exif.ErrRemovingExif: return err default: return fmt.Errorf("persisting multipart file: %v", err) diff --git a/workhorse/internal/upload/saved_file_tracker.go b/workhorse/internal/upload/saved_file_tracker.go index e6f9a8c9a88..b70a303a4a4 100644 --- a/workhorse/internal/upload/saved_file_tracker.go +++ b/workhorse/internal/upload/saved_file_tracker.go @@ -6,8 +6,8 @@ import ( "mime/multipart" "net/http" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/secret" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" ) type SavedFileTracker struct { @@ -26,7 +26,7 @@ func (s *SavedFileTracker) Count() int { return len(s.rewrittenFields) } -func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *filestore.FileHandler, _ *multipart.Writer) error { +func (s *SavedFileTracker) ProcessFile(_ context.Context, fieldName string, file *destination.FileHandler, _ *multipart.Writer) error { if _, ok := s.rewrittenFields[fieldName]; ok { return fmt.Errorf("the %v field has already been processed", fieldName) } diff --git a/workhorse/internal/upload/saved_file_tracker_test.go b/workhorse/internal/upload/saved_file_tracker_test.go index ba927db253e..4f323bf8888 100644 --- a/workhorse/internal/upload/saved_file_tracker_test.go +++ b/workhorse/internal/upload/saved_file_tracker_test.go @@ -10,8 +10,8 @@ import ( "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" ) func TestSavedFileTracking(t *testing.T) { @@ -23,7 +23,7 @@ func TestSavedFileTracking(t *testing.T) { tracker := SavedFileTracker{Request: r} require.Equal(t, "accelerate", tracker.Name()) - file := &filestore.FileHandler{} + file := &destination.FileHandler{} ctx := context.Background() tracker.ProcessFile(ctx, "test", file, nil) require.Equal(t, 1, tracker.Count()) @@ -40,7 +40,7 @@ func TestSavedFileTracking(t *testing.T) { func TestDuplicatedFileProcessing(t *testing.T) { tracker := SavedFileTracker{} - file := &filestore.FileHandler{} + file := &destination.FileHandler{} require.NoError(t, tracker.ProcessFile(context.Background(), "file", file, nil)) diff --git a/workhorse/internal/upload/uploads.go b/workhorse/internal/upload/uploads.go index 1806e7563ce..8272a3d920d 100644 --- a/workhorse/internal/upload/uploads.go +++ b/workhorse/internal/upload/uploads.go @@ -8,27 +8,39 @@ import ( "mime/multipart" "net/http" + "github.com/golang-jwt/jwt/v4" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/exif" "gitlab.com/gitlab-org/gitlab/workhorse/internal/zipartifacts" ) +const RewrittenFieldsHeader = "Gitlab-Workhorse-Multipart-Fields" + +type PreAuthorizer interface { + PreAuthorizeHandler(next api.HandleFunc, suffix string) http.Handler +} + +type MultipartClaims struct { + RewrittenFields map[string]string `json:"rewritten_fields"` + jwt.StandardClaims +} + // MultipartFormProcessor abstracts away implementation differences // between generic MIME multipart file uploads and CI artifact uploads. type MultipartFormProcessor interface { - ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error + ProcessFile(ctx context.Context, formName string, file *destination.FileHandler, writer *multipart.Writer) error ProcessField(ctx context.Context, formName string, writer *multipart.Writer) error Finalize(ctx context.Context) error Name() string Count() int } -// InterceptMultipartFiles is the core of the implementation of -// Multipart. Because it is also used for CI artifact uploads it is a -// public function. -func InterceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *filestore.SaveFileOpts) { +// interceptMultipartFiles is the core of the implementation of +// Multipart. +func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) { var body bytes.Buffer writer := multipart.NewWriter(&body) defer writer.Close() @@ -43,7 +55,7 @@ func InterceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Hand helper.CaptureAndFail(w, r, err, err.Error(), http.StatusBadRequest) case http.ErrNotMultipart: h.ServeHTTP(w, r) - case filestore.ErrEntityTooLarge: + case destination.ErrEntityTooLarge: helper.RequestEntityTooLarge(w, r, err) case zipartifacts.ErrBadMetadata: helper.RequestEntityTooLarge(w, r, err) diff --git a/workhorse/internal/upload/uploads_test.go b/workhorse/internal/upload/uploads_test.go index f262bf94b08..9d787b10d1c 100644 --- a/workhorse/internal/upload/uploads_test.go +++ b/workhorse/internal/upload/uploads_test.go @@ -22,9 +22,9 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/objectstore/test" "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy" "gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper" ) @@ -78,7 +78,7 @@ func TestUploadHandlerForwardingRawData(t *testing.T) { opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, handler, apiResponse, nil, opts) + interceptMultipartFiles(response, httpRequest, handler, apiResponse, nil, opts) require.Equal(t, 202, response.Code) require.Equal(t, "RESPONSE", response.Body.String(), "response body") @@ -149,7 +149,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) { opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) + interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) require.Equal(t, 202, response.Code) cancel() // this will trigger an async cleanup @@ -218,7 +218,7 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) { opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) + interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) require.Equal(t, test.response, response.Code) cancel() // this will trigger an async cleanup @@ -248,7 +248,7 @@ func TestUploadProcessingField(t *testing.T) { opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) + interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) require.Equal(t, 500, response.Code) } @@ -279,7 +279,7 @@ func TestUploadingMultipleFiles(t *testing.T) { opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) + interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) require.Equal(t, 400, response.Code) require.Equal(t, "upload request contains more than 10 files\n", response.Body.String()) @@ -335,7 +335,7 @@ func TestUploadProcessingFile(t *testing.T) { opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) + interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) require.Equal(t, 200, response.Code) }) @@ -381,7 +381,7 @@ func TestInvalidFileNames(t *testing.T) { opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts) + interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts) require.Equal(t, testCase.code, response.Code) require.Equal(t, testCase.expectedPrefix, opts.TempFilePrefix) } @@ -447,7 +447,7 @@ func TestContentDispositionRewrite(t *testing.T) { opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, customHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts) + interceptMultipartFiles(response, httpRequest, customHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts) upstreamRequest, err := http.ReadRequest(bufio.NewReader(&upstreamRequestBuffer)) require.NoError(t, err) @@ -570,7 +570,7 @@ func runUploadTest(t *testing.T, image []byte, filename string, httpCode int, ts opts, _, err := preparer.Prepare(apiResponse) require.NoError(t, err) - InterceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) + interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) require.Equal(t, httpCode, response.Code) } |