diff options
Diffstat (limited to 'workhorse/internal/filestore')
-rw-r--r-- | workhorse/internal/filestore/file_handler.go | 257 | ||||
-rw-r--r-- | workhorse/internal/filestore/file_handler_test.go | 551 | ||||
-rw-r--r-- | workhorse/internal/filestore/multi_hash.go | 48 | ||||
-rw-r--r-- | workhorse/internal/filestore/reader.go | 17 | ||||
-rw-r--r-- | workhorse/internal/filestore/reader_test.go | 46 | ||||
-rw-r--r-- | workhorse/internal/filestore/save_file_opts.go | 171 | ||||
-rw-r--r-- | workhorse/internal/filestore/save_file_opts_test.go | 331 |
7 files changed, 1421 insertions, 0 deletions
diff --git a/workhorse/internal/filestore/file_handler.go b/workhorse/internal/filestore/file_handler.go new file mode 100644 index 00000000000..19764e9a5cf --- /dev/null +++ b/workhorse/internal/filestore/file_handler.go @@ -0,0 +1,257 @@ +package filestore + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "strconv" + "time" + + "github.com/dgrijalva/jwt-go" + + "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/secret" +) + +type SizeError error + +// ErrEntityTooLarge means that the uploaded content is bigger then maximum allowed size +var ErrEntityTooLarge = errors.New("entity is too large") + +// FileHandler represent a file that has been processed for upload +// it may be either uploaded to an ObjectStore and/or saved on local path. +type FileHandler struct { + // LocalPath is the path on the disk where file has been stored + LocalPath string + + // RemoteID is the objectID provided by GitLab Rails + RemoteID string + // RemoteURL is ObjectStore URL provided by GitLab Rails + RemoteURL string + + // Size is the persisted file size + Size int64 + + // Name is the resource name to send back to GitLab rails. + // It differ from the real file name in order to avoid file collisions + Name string + + // a map containing different hashes + hashes map[string]string +} + +type uploadClaims struct { + Upload map[string]string `json:"upload"` + jwt.StandardClaims +} + +// SHA256 hash of the handled file +func (fh *FileHandler) SHA256() string { + return fh.hashes["sha256"] +} + +// MD5 hash of the handled file +func (fh *FileHandler) MD5() string { + return fh.hashes["md5"] +} + +// GitLabFinalizeFields returns a map with all the fields GitLab Rails needs in order to finalize the upload. +func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, error) { + // TODO: remove `data` these once rails fully and exclusively support `signedData` (https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/263) + data := make(map[string]string) + signedData := make(map[string]string) + key := func(field string) string { + if prefix == "" { + return field + } + + return fmt.Sprintf("%s.%s", prefix, field) + } + + for k, v := range map[string]string{ + "name": fh.Name, + "path": fh.LocalPath, + "remote_url": fh.RemoteURL, + "remote_id": fh.RemoteID, + "size": strconv.FormatInt(fh.Size, 10), + } { + data[key(k)] = v + signedData[k] = v + } + + for hashName, hash := range fh.hashes { + data[key(hashName)] = hash + signedData[hashName] = hash + } + + claims := uploadClaims{Upload: signedData, StandardClaims: secret.DefaultClaims} + jwtData, err := secret.JWTTokenString(claims) + if err != nil { + return nil, err + } + data[key("gitlab-workhorse-upload")] = jwtData + + return data, nil +} + +type consumer interface { + Consume(context.Context, io.Reader, time.Time) (int64, error) +} + +// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done +// Make sure the provided context will not expire before finalizing upload with GitLab Rails. +func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) { + var uploadDestination consumer + fh = &FileHandler{ + Name: opts.TempFilePrefix, + RemoteID: opts.RemoteID, + RemoteURL: opts.RemoteURL, + } + hashes := newMultiHash() + reader = io.TeeReader(reader, hashes.Writer) + + var clientMode string + + switch { + case opts.IsLocal(): + clientMode = "local" + uploadDestination, err = fh.uploadLocalFile(ctx, opts) + case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud(): + clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider) + p := &objectstore.GoCloudObjectParams{ + Ctx: ctx, + Mux: opts.ObjectStorageConfig.URLMux, + BucketURL: opts.ObjectStorageConfig.GoCloudConfig.URL, + ObjectName: opts.RemoteTempObjectID, + } + uploadDestination, err = objectstore.NewGoCloudObject(p) + case opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid(): + clientMode = "s3" + uploadDestination, err = objectstore.NewS3Object( + opts.RemoteTempObjectID, + opts.ObjectStorageConfig.S3Credentials, + opts.ObjectStorageConfig.S3Config, + ) + case opts.IsMultipart(): + clientMode = "multipart" + uploadDestination, err = objectstore.NewMultipart( + opts.PresignedParts, + opts.PresignedCompleteMultipart, + opts.PresignedAbortMultipart, + opts.PresignedDelete, + opts.PutHeaders, + opts.PartSize, + ) + default: + clientMode = "http" + uploadDestination, err = objectstore.NewObject( + opts.PresignedPut, + opts.PresignedDelete, + opts.PutHeaders, + size, + ) + } + + if err != nil { + return nil, err + } + + if opts.MaximumSize > 0 { + if size > opts.MaximumSize { + return nil, SizeError(fmt.Errorf("the upload size %d is over maximum of %d bytes", size, opts.MaximumSize)) + } + + hlr := &hardLimitReader{r: reader, n: opts.MaximumSize} + reader = hlr + defer func() { + if hlr.n < 0 { + err = ErrEntityTooLarge + } + }() + } + + fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline) + if err != nil { + if err == objectstore.ErrNotEnoughParts { + err = ErrEntityTooLarge + } + return nil, err + } + + if size != -1 && size != fh.Size { + return nil, SizeError(fmt.Errorf("expected %d bytes but got only %d", size, fh.Size)) + } + + logger := log.WithContextFields(ctx, log.Fields{ + "copied_bytes": fh.Size, + "is_local": opts.IsLocal(), + "is_multipart": opts.IsMultipart(), + "is_remote": !opts.IsLocal(), + "remote_id": opts.RemoteID, + "temp_file_prefix": opts.TempFilePrefix, + "client_mode": clientMode, + }) + + if opts.IsLocal() { + logger = logger.WithField("local_temp_path", opts.LocalTempPath) + } else { + logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID) + } + + logger.Info("saved file") + fh.hashes = hashes.finish() + return fh, nil +} + +func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts) (consumer, error) { + // make sure TempFolder exists + err := os.MkdirAll(opts.LocalTempPath, 0700) + if err != nil { + return nil, fmt.Errorf("uploadLocalFile: mkdir %q: %v", opts.LocalTempPath, err) + } + + file, err := ioutil.TempFile(opts.LocalTempPath, opts.TempFilePrefix) + if err != nil { + return nil, fmt.Errorf("uploadLocalFile: create file: %v", err) + } + + go func() { + <-ctx.Done() + os.Remove(file.Name()) + }() + + fh.LocalPath = file.Name() + return &localUpload{file}, nil +} + +type localUpload struct{ io.WriteCloser } + +func (loc *localUpload) Consume(_ context.Context, r io.Reader, _ time.Time) (int64, error) { + n, err := io.Copy(loc.WriteCloser, r) + errClose := loc.Close() + if err == nil { + err = errClose + } + return n, err +} + +// SaveFileFromDisk open the local file fileName and calls SaveFileFromReader +func SaveFileFromDisk(ctx context.Context, fileName string, opts *SaveFileOpts) (fh *FileHandler, err error) { + file, err := os.Open(fileName) + if err != nil { + return nil, err + } + defer file.Close() + + fi, err := file.Stat() + if err != nil { + return nil, err + } + + return SaveFileFromReader(ctx, file, fi.Size(), opts) +} diff --git a/workhorse/internal/filestore/file_handler_test.go b/workhorse/internal/filestore/file_handler_test.go new file mode 100644 index 00000000000..e79e9d0f292 --- /dev/null +++ b/workhorse/internal/filestore/file_handler_test.go @@ -0,0 +1,551 @@ +package filestore_test + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "os" + "path" + "strconv" + "strings" + "testing" + "time" + + "github.com/dgrijalva/jwt-go" + "github.com/stretchr/testify/require" + "gocloud.dev/blob" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper" +) + +func testDeadline() time.Time { + return time.Now().Add(filestore.DefaultObjectStoreTimeout) +} + +func requireFileGetsRemovedAsync(t *testing.T, filePath string) { + var err error + + // Poll because the file removal is async + for i := 0; i < 100; i++ { + _, err = os.Stat(filePath) + if err != nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + require.True(t, os.IsNotExist(err), "File hasn't been deleted during cleanup") +} + +func requireObjectStoreDeletedAsync(t *testing.T, expectedDeletes int, osStub *test.ObjectstoreStub) { + // Poll because the object removal is async + for i := 0; i < 100; i++ { + if osStub.DeletesCnt() == expectedDeletes { + break + } + time.Sleep(10 * time.Millisecond) + } + + require.Equal(t, expectedDeletes, osStub.DeletesCnt(), "Object not deleted") +} + +func TestSaveFileWrongSize(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"} + fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, opts) + require.Error(t, err) + _, isSizeError := err.(filestore.SizeError) + require.True(t, isSizeError, "Should fail with SizeError") + require.Nil(t, fh) +} + +func TestSaveFileWithKnownSizeExceedLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1} + fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts) + require.Error(t, err) + _, isSizeError := err.(filestore.SizeError) + require.True(t, isSizeError, "Should fail with SizeError") + require.Nil(t, fh) +} + +func TestSaveFileWithUnknownSizeExceedLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1} + fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), -1, opts) + require.Equal(t, err, filestore.ErrEntityTooLarge) + require.Nil(t, fh) +} + +func TestSaveFromDiskNotExistingFile(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fh, err := filestore.SaveFileFromDisk(ctx, "/I/do/not/exist", &filestore.SaveFileOpts{}) + require.Error(t, err, "SaveFileFromDisk should fail") + require.True(t, os.IsNotExist(err), "Provided file should not exists") + require.Nil(t, fh, "On error FileHandler should be nil") +} + +func TestSaveFileWrongETag(t *testing.T) { + tests := []struct { + name string + multipart bool + }{ + {name: "single part"}, + {name: "multi part", multipart: true}, + } + + for _, spec := range tests { + t.Run(spec.name, func(t *testing.T) { + osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"}) + defer ts.Close() + + objectURL := ts.URL + test.ObjectPath + + opts := &filestore.SaveFileOpts{ + RemoteID: "test-file", + RemoteURL: objectURL, + PresignedPut: objectURL + "?Signature=ASignature", + PresignedDelete: objectURL + "?Signature=AnotherSignature", + Deadline: testDeadline(), + } + if spec.multipart { + opts.PresignedParts = []string{objectURL + "?partNumber=1"} + opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSig" + opts.PresignedAbortMultipart = objectURL + "?Signature=AbortSig" + opts.PartSize = test.ObjectSize + + osStub.InitiateMultipartUpload(test.ObjectPath) + } + ctx, cancel := context.WithCancel(context.Background()) + fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts) + require.Nil(t, fh) + require.Error(t, err) + require.Equal(t, 1, osStub.PutsCnt(), "File not uploaded") + + cancel() // this will trigger an async cleanup + requireObjectStoreDeletedAsync(t, 1, osStub) + require.False(t, spec.multipart && osStub.IsMultipartUpload(test.ObjectPath), "there must be no multipart upload in progress now") + }) + } +} + +func TestSaveFileFromDiskToLocalPath(t *testing.T) { + f, err := ioutil.TempFile("", "workhorse-test") + require.NoError(t, err) + defer os.Remove(f.Name()) + + _, err = fmt.Fprint(f, test.ObjectContent) + require.NoError(t, err) + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder} + fh, err := filestore.SaveFileFromDisk(ctx, f.Name(), opts) + require.NoError(t, err) + require.NotNil(t, fh) + + require.NotEmpty(t, fh.LocalPath, "File not persisted on disk") + _, err = os.Stat(fh.LocalPath) + require.NoError(t, err) +} + +func TestSaveFile(t *testing.T) { + testhelper.ConfigureSecret() + + type remote int + const ( + notRemote remote = iota + remoteSingle + remoteMultipart + ) + + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + tests := []struct { + name string + local bool + remote remote + }{ + {name: "Local only", local: true}, + {name: "Remote Single only", remote: remoteSingle}, + {name: "Remote Multipart only", remote: remoteMultipart}, + } + + for _, spec := range tests { + t.Run(spec.name, func(t *testing.T) { + var opts filestore.SaveFileOpts + var expectedDeletes, expectedPuts int + + osStub, ts := test.StartObjectStore() + defer ts.Close() + + switch spec.remote { + case remoteSingle: + objectURL := ts.URL + test.ObjectPath + + opts.RemoteID = "test-file" + opts.RemoteURL = objectURL + opts.PresignedPut = objectURL + "?Signature=ASignature" + opts.PresignedDelete = objectURL + "?Signature=AnotherSignature" + opts.Deadline = testDeadline() + + expectedDeletes = 1 + expectedPuts = 1 + case remoteMultipart: + objectURL := ts.URL + test.ObjectPath + + opts.RemoteID = "test-file" + opts.RemoteURL = objectURL + opts.PresignedDelete = objectURL + "?Signature=AnotherSignature" + opts.PartSize = int64(len(test.ObjectContent)/2) + 1 + opts.PresignedParts = []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"} + opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSignature" + opts.Deadline = testDeadline() + + osStub.InitiateMultipartUpload(test.ObjectPath) + expectedDeletes = 1 + expectedPuts = 2 + } + + if spec.local { + opts.LocalTempPath = tmpFolder + opts.TempFilePrefix = "test-file" + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) + require.NoError(t, err) + require.NotNil(t, fh) + + require.Equal(t, opts.RemoteID, fh.RemoteID) + require.Equal(t, opts.RemoteURL, fh.RemoteURL) + + if spec.local { + require.NotEmpty(t, fh.LocalPath, "File not persisted on disk") + _, err := os.Stat(fh.LocalPath) + require.NoError(t, err) + + dir := path.Dir(fh.LocalPath) + require.Equal(t, opts.LocalTempPath, dir) + filename := path.Base(fh.LocalPath) + beginsWithPrefix := strings.HasPrefix(filename, opts.TempFilePrefix) + require.True(t, beginsWithPrefix, fmt.Sprintf("LocalPath filename %q do not begin with TempFilePrefix %q", filename, opts.TempFilePrefix)) + } else { + require.Empty(t, fh.LocalPath, "LocalPath must be empty for non local uploads") + } + + require.Equal(t, test.ObjectSize, fh.Size) + require.Equal(t, test.ObjectMD5, fh.MD5()) + require.Equal(t, test.ObjectSHA256, fh.SHA256()) + + require.Equal(t, expectedPuts, osStub.PutsCnt(), "ObjectStore PutObject count mismatch") + require.Equal(t, 0, osStub.DeletesCnt(), "File deleted too early") + + cancel() // this will trigger an async cleanup + requireObjectStoreDeletedAsync(t, expectedDeletes, osStub) + requireFileGetsRemovedAsync(t, fh.LocalPath) + + // checking generated fields + fields, err := fh.GitLabFinalizeFields("file") + require.NoError(t, err) + + checkFileHandlerWithFields(t, fh, fields, "file") + + token, jwtErr := jwt.ParseWithClaims(fields["file.gitlab-workhorse-upload"], &testhelper.UploadClaims{}, testhelper.ParseJWT) + require.NoError(t, jwtErr) + + uploadFields := token.Claims.(*testhelper.UploadClaims).Upload + + checkFileHandlerWithFields(t, fh, uploadFields, "") + }) + } +} + +func TestSaveFileWithS3WorkhorseClient(t *testing.T) { + tests := []struct { + name string + objectSize int64 + maxSize int64 + expectedErr error + }{ + { + name: "known size with no limit", + objectSize: test.ObjectSize, + }, + { + name: "unknown size with no limit", + objectSize: -1, + }, + { + name: "unknown object size with limit", + objectSize: -1, + maxSize: test.ObjectSize - 1, + expectedErr: filestore.ErrEntityTooLarge, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + s3Creds, s3Config, sess, ts := test.SetupS3(t, "") + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + remoteObject := "tmp/test-file/1" + opts := filestore.SaveFileOpts{ + RemoteID: "test-file", + Deadline: testDeadline(), + UseWorkhorseClient: true, + RemoteTempObjectID: remoteObject, + ObjectStorageConfig: filestore.ObjectStorageConfig{ + Provider: "AWS", + S3Credentials: s3Creds, + S3Config: s3Config, + }, + MaximumSize: tc.maxSize, + } + + _, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, &opts) + + if tc.expectedErr == nil { + require.NoError(t, err) + test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent) + } else { + require.Equal(t, tc.expectedErr, err) + test.S3ObjectDoesNotExist(t, sess, s3Config, remoteObject) + } + }) + } +} + +func TestSaveFileWithAzureWorkhorseClient(t *testing.T) { + mux, bucketDir, cleanup := test.SetupGoCloudFileBucket(t, "azblob") + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + remoteObject := "tmp/test-file/1" + opts := filestore.SaveFileOpts{ + RemoteID: "test-file", + Deadline: testDeadline(), + UseWorkhorseClient: true, + RemoteTempObjectID: remoteObject, + ObjectStorageConfig: filestore.ObjectStorageConfig{ + Provider: "AzureRM", + URLMux: mux, + GoCloudConfig: config.GoCloudConfig{URL: "azblob://test-container"}, + }, + } + + _, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) + require.NoError(t, err) + + test.GoCloudObjectExists(t, bucketDir, remoteObject) +} + +func TestSaveFileWithUnknownGoCloudScheme(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mux := new(blob.URLMux) + + remoteObject := "tmp/test-file/1" + opts := filestore.SaveFileOpts{ + RemoteID: "test-file", + Deadline: testDeadline(), + UseWorkhorseClient: true, + RemoteTempObjectID: remoteObject, + ObjectStorageConfig: filestore.ObjectStorageConfig{ + Provider: "SomeCloud", + URLMux: mux, + GoCloudConfig: config.GoCloudConfig{URL: "foo://test-container"}, + }, + } + + _, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) + require.Error(t, err) +} + +func TestSaveMultipartInBodyFailure(t *testing.T) { + osStub, ts := test.StartObjectStore() + defer ts.Close() + + // this is a broken path because it contains bucket name but no key + // this is the only way to get an in-body failure from our ObjectStoreStub + objectPath := "/bucket-but-no-object-key" + objectURL := ts.URL + objectPath + opts := filestore.SaveFileOpts{ + RemoteID: "test-file", + RemoteURL: objectURL, + PartSize: test.ObjectSize, + PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}, + PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature", + Deadline: testDeadline(), + } + + osStub.InitiateMultipartUpload(objectPath) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) + require.Nil(t, fh) + require.Error(t, err) + require.EqualError(t, err, test.MultipartUploadInternalError().Error()) +} + +func TestSaveRemoteFileWithLimit(t *testing.T) { + testhelper.ConfigureSecret() + + type remote int + const ( + notRemote remote = iota + remoteSingle + remoteMultipart + ) + + remoteTypes := []remote{remoteSingle, remoteMultipart} + + tests := []struct { + name string + objectSize int64 + maxSize int64 + expectedErr error + testData string + }{ + { + name: "known size with no limit", + testData: test.ObjectContent, + objectSize: test.ObjectSize, + }, + { + name: "unknown size with no limit", + testData: test.ObjectContent, + objectSize: -1, + }, + { + name: "unknown object size with limit", + testData: test.ObjectContent, + objectSize: -1, + maxSize: test.ObjectSize - 1, + expectedErr: filestore.ErrEntityTooLarge, + }, + { + name: "large object with unknown size with limit", + testData: string(make([]byte, 20000)), + objectSize: -1, + maxSize: 19000, + expectedErr: filestore.ErrEntityTooLarge, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var opts filestore.SaveFileOpts + + for _, remoteType := range remoteTypes { + tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpFolder) + + osStub, ts := test.StartObjectStore() + defer ts.Close() + + switch remoteType { + case remoteSingle: + objectURL := ts.URL + test.ObjectPath + + opts.RemoteID = "test-file" + opts.RemoteURL = objectURL + opts.PresignedPut = objectURL + "?Signature=ASignature" + opts.PresignedDelete = objectURL + "?Signature=AnotherSignature" + opts.Deadline = testDeadline() + opts.MaximumSize = tc.maxSize + case remoteMultipart: + objectURL := ts.URL + test.ObjectPath + + opts.RemoteID = "test-file" + opts.RemoteURL = objectURL + opts.PresignedDelete = objectURL + "?Signature=AnotherSignature" + opts.PartSize = int64(len(tc.testData)/2) + 1 + opts.PresignedParts = []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"} + opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSignature" + opts.Deadline = testDeadline() + opts.MaximumSize = tc.maxSize + + require.Less(t, int64(len(tc.testData)), int64(len(opts.PresignedParts))*opts.PartSize, "check part size calculation") + + osStub.InitiateMultipartUpload(test.ObjectPath) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(tc.testData), tc.objectSize, &opts) + + if tc.expectedErr == nil { + require.NoError(t, err) + require.NotNil(t, fh) + } else { + require.True(t, errors.Is(err, tc.expectedErr)) + require.Nil(t, fh) + } + } + }) + } +} + +func checkFileHandlerWithFields(t *testing.T, fh *filestore.FileHandler, fields map[string]string, prefix string) { + key := func(field string) string { + if prefix == "" { + return field + } + + return fmt.Sprintf("%s.%s", prefix, field) + } + + require.Equal(t, fh.Name, fields[key("name")]) + require.Equal(t, fh.LocalPath, fields[key("path")]) + require.Equal(t, fh.RemoteURL, fields[key("remote_url")]) + require.Equal(t, fh.RemoteID, fields[key("remote_id")]) + require.Equal(t, strconv.FormatInt(test.ObjectSize, 10), fields[key("size")]) + require.Equal(t, test.ObjectMD5, fields[key("md5")]) + require.Equal(t, test.ObjectSHA1, fields[key("sha1")]) + require.Equal(t, test.ObjectSHA256, fields[key("sha256")]) + require.Equal(t, test.ObjectSHA512, fields[key("sha512")]) +} diff --git a/workhorse/internal/filestore/multi_hash.go b/workhorse/internal/filestore/multi_hash.go new file mode 100644 index 00000000000..40efd3a5c1f --- /dev/null +++ b/workhorse/internal/filestore/multi_hash.go @@ -0,0 +1,48 @@ +package filestore + +import ( + "crypto/md5" + "crypto/sha1" + "crypto/sha256" + "crypto/sha512" + "encoding/hex" + "hash" + "io" +) + +var hashFactories = map[string](func() hash.Hash){ + "md5": md5.New, + "sha1": sha1.New, + "sha256": sha256.New, + "sha512": sha512.New, +} + +type multiHash struct { + io.Writer + hashes map[string]hash.Hash +} + +func newMultiHash() (m *multiHash) { + m = &multiHash{} + m.hashes = make(map[string]hash.Hash) + + var writers []io.Writer + for hash, hashFactory := range hashFactories { + writer := hashFactory() + + m.hashes[hash] = writer + writers = append(writers, writer) + } + + m.Writer = io.MultiWriter(writers...) + return m +} + +func (m *multiHash) finish() map[string]string { + h := make(map[string]string) + for hashName, hash := range m.hashes { + checksum := hash.Sum(nil) + h[hashName] = hex.EncodeToString(checksum) + } + return h +} diff --git a/workhorse/internal/filestore/reader.go b/workhorse/internal/filestore/reader.go new file mode 100644 index 00000000000..b1045b991fc --- /dev/null +++ b/workhorse/internal/filestore/reader.go @@ -0,0 +1,17 @@ +package filestore + +import "io" + +type hardLimitReader struct { + r io.Reader + n int64 +} + +func (h *hardLimitReader) Read(p []byte) (int, error) { + nRead, err := h.r.Read(p) + h.n -= int64(nRead) + if h.n < 0 { + err = ErrEntityTooLarge + } + return nRead, err +} diff --git a/workhorse/internal/filestore/reader_test.go b/workhorse/internal/filestore/reader_test.go new file mode 100644 index 00000000000..424d921ecaf --- /dev/null +++ b/workhorse/internal/filestore/reader_test.go @@ -0,0 +1,46 @@ +package filestore + +import ( + "fmt" + "io/ioutil" + "strings" + "testing" + "testing/iotest" + + "github.com/stretchr/testify/require" +) + +func TestHardLimitReader(t *testing.T) { + const text = "hello world" + r := iotest.OneByteReader( + &hardLimitReader{ + r: strings.NewReader(text), + n: int64(len(text)), + }, + ) + + out, err := ioutil.ReadAll(r) + require.NoError(t, err) + require.Equal(t, text, string(out)) +} + +func TestHardLimitReaderFail(t *testing.T) { + const text = "hello world" + + for bufSize := len(text) / 2; bufSize < len(text)*2; bufSize++ { + t.Run(fmt.Sprintf("bufsize:%d", bufSize), func(t *testing.T) { + r := &hardLimitReader{ + r: iotest.DataErrReader(strings.NewReader(text)), + n: int64(len(text)) - 1, + } + buf := make([]byte, bufSize) + + var err error + for i := 0; err == nil && i < 1000; i++ { + _, err = r.Read(buf) + } + + require.Equal(t, ErrEntityTooLarge, err) + }) + } +} diff --git a/workhorse/internal/filestore/save_file_opts.go b/workhorse/internal/filestore/save_file_opts.go new file mode 100644 index 00000000000..1eb708c3f55 --- /dev/null +++ b/workhorse/internal/filestore/save_file_opts.go @@ -0,0 +1,171 @@ +package filestore + +import ( + "errors" + "strings" + "time" + + "gocloud.dev/blob" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" +) + +// DefaultObjectStoreTimeout is the timeout for ObjectStore upload operation +const DefaultObjectStoreTimeout = 4 * time.Hour + +type ObjectStorageConfig struct { + Provider string + + S3Credentials config.S3Credentials + S3Config config.S3Config + + // GoCloud mux that maps azureblob:// and future URLs (e.g. s3://, gcs://, etc.) to a handler + URLMux *blob.URLMux + + // Azure credentials are registered at startup in the GoCloud URLMux, so only the container name is needed + GoCloudConfig config.GoCloudConfig +} + +// SaveFileOpts represents all the options available for saving a file to object store +type SaveFileOpts struct { + // TempFilePrefix is the prefix used to create temporary local file + TempFilePrefix string + // LocalTempPath is the directory where to write a local copy of the file + LocalTempPath string + // RemoteID is the remote ObjectID provided by GitLab + RemoteID string + // RemoteURL is the final URL of the file + RemoteURL string + // PresignedPut is a presigned S3 PutObject compatible URL + PresignedPut string + // PresignedDelete is a presigned S3 DeleteObject compatible URL. + PresignedDelete string + // HTTP headers to be sent along with PUT request + PutHeaders map[string]string + // Whether to ignore Rails pre-signed URLs and have Workhorse directly access object storage provider + UseWorkhorseClient bool + // If UseWorkhorseClient is true, this is the temporary object name to store the file + RemoteTempObjectID string + // Workhorse object storage client (e.g. S3) parameters + ObjectStorageConfig ObjectStorageConfig + // Deadline it the S3 operation deadline, the upload will be aborted if not completed in time + Deadline time.Time + // The maximum accepted size in bytes of the upload + MaximumSize int64 + + //MultipartUpload parameters + // PartSize is the exact size of each uploaded part. Only the last one can be smaller + PartSize int64 + // PresignedParts contains the presigned URLs for each part + PresignedParts []string + // PresignedCompleteMultipart is a presigned URL for CompleteMulipartUpload + PresignedCompleteMultipart string + // PresignedAbortMultipart is a presigned URL for AbortMultipartUpload + PresignedAbortMultipart string +} + +// UseWorkhorseClientEnabled checks if the options require direct access to object storage +func (s *SaveFileOpts) UseWorkhorseClientEnabled() bool { + return s.UseWorkhorseClient && s.ObjectStorageConfig.IsValid() && s.RemoteTempObjectID != "" +} + +// IsLocal checks if the options require the writing of the file on disk +func (s *SaveFileOpts) IsLocal() bool { + return s.LocalTempPath != "" +} + +// IsMultipart checks if the options requires a Multipart upload +func (s *SaveFileOpts) IsMultipart() bool { + return s.PartSize > 0 +} + +// GetOpts converts GitLab api.Response to a proper SaveFileOpts +func GetOpts(apiResponse *api.Response) (*SaveFileOpts, error) { + timeout := time.Duration(apiResponse.RemoteObject.Timeout) * time.Second + if timeout == 0 { + timeout = DefaultObjectStoreTimeout + } + + opts := SaveFileOpts{ + LocalTempPath: apiResponse.TempPath, + RemoteID: apiResponse.RemoteObject.ID, + RemoteURL: apiResponse.RemoteObject.GetURL, + PresignedPut: apiResponse.RemoteObject.StoreURL, + PresignedDelete: apiResponse.RemoteObject.DeleteURL, + PutHeaders: apiResponse.RemoteObject.PutHeaders, + UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient, + RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID, + Deadline: time.Now().Add(timeout), + MaximumSize: apiResponse.MaximumSize, + } + + if opts.LocalTempPath != "" && opts.RemoteID != "" { + return nil, errors.New("API response has both TempPath and RemoteObject") + } + + if opts.LocalTempPath == "" && opts.RemoteID == "" { + return nil, errors.New("API response has neither TempPath nor RemoteObject") + } + + objectStorageParams := apiResponse.RemoteObject.ObjectStorage + if opts.UseWorkhorseClient && objectStorageParams != nil { + opts.ObjectStorageConfig.Provider = objectStorageParams.Provider + opts.ObjectStorageConfig.S3Config = objectStorageParams.S3Config + opts.ObjectStorageConfig.GoCloudConfig = objectStorageParams.GoCloudConfig + } + + // Backwards compatibility to ensure API servers that do not include the + // CustomPutHeaders flag will default to the original content type. + if !apiResponse.RemoteObject.CustomPutHeaders { + opts.PutHeaders = make(map[string]string) + opts.PutHeaders["Content-Type"] = "application/octet-stream" + } + + if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil { + opts.PartSize = multiParams.PartSize + opts.PresignedCompleteMultipart = multiParams.CompleteURL + opts.PresignedAbortMultipart = multiParams.AbortURL + opts.PresignedParts = append([]string(nil), multiParams.PartURLs...) + } + + return &opts, nil +} + +func (c *ObjectStorageConfig) IsAWS() bool { + return strings.EqualFold(c.Provider, "AWS") || strings.EqualFold(c.Provider, "S3") +} + +func (c *ObjectStorageConfig) IsAzure() bool { + return strings.EqualFold(c.Provider, "AzureRM") +} + +func (c *ObjectStorageConfig) IsGoCloud() bool { + return c.GoCloudConfig.URL != "" +} + +func (c *ObjectStorageConfig) IsValid() bool { + if c.IsAWS() { + return c.S3Config.Bucket != "" && c.S3Config.Region != "" && c.s3CredentialsValid() + } else if c.IsGoCloud() { + // We could parse and validate the URL, but GoCloud providers + // such as AzureRM don't have a fallback to normal HTTP, so we + // always want to try the GoCloud path if there is a URL. + return true + } + + return false +} + +func (c *ObjectStorageConfig) s3CredentialsValid() bool { + // We need to be able to distinguish between two cases of AWS access: + // 1. AWS access via key and secret, but credentials not configured in Workhorse + // 2. IAM instance profiles used + if c.S3Config.UseIamProfile { + return true + } else if c.S3Credentials.AwsAccessKeyID != "" && c.S3Credentials.AwsSecretAccessKey != "" { + return true + } + + return false +} diff --git a/workhorse/internal/filestore/save_file_opts_test.go b/workhorse/internal/filestore/save_file_opts_test.go new file mode 100644 index 00000000000..2d6cd683b51 --- /dev/null +++ b/workhorse/internal/filestore/save_file_opts_test.go @@ -0,0 +1,331 @@ +package filestore_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test" +) + +func TestSaveFileOptsLocalAndRemote(t *testing.T) { + tests := []struct { + name string + localTempPath string + presignedPut string + partSize int64 + isLocal bool + isRemote bool + isMultipart bool + }{ + { + name: "Only LocalTempPath", + localTempPath: "/tmp", + isLocal: true, + }, + { + name: "No paths", + }, + { + name: "Only remoteUrl", + presignedPut: "http://example.com", + }, + { + name: "Multipart", + partSize: 10, + isMultipart: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + opts := filestore.SaveFileOpts{ + LocalTempPath: test.localTempPath, + PresignedPut: test.presignedPut, + PartSize: test.partSize, + } + + require.Equal(t, test.isLocal, opts.IsLocal(), "IsLocal() mismatch") + require.Equal(t, test.isMultipart, opts.IsMultipart(), "IsMultipart() mismatch") + }) + } +} + +func TestGetOpts(t *testing.T) { + tests := []struct { + name string + multipart *api.MultipartUploadParams + customPutHeaders bool + putHeaders map[string]string + }{ + { + name: "Single upload", + }, { + name: "Multipart upload", + multipart: &api.MultipartUploadParams{ + PartSize: 10, + CompleteURL: "http://complete", + AbortURL: "http://abort", + PartURLs: []string{"http://part1", "http://part2"}, + }, + }, + { + name: "Single upload with custom content type", + customPutHeaders: true, + putHeaders: map[string]string{"Content-Type": "image/jpeg"}, + }, { + name: "Multipart upload with custom content type", + multipart: &api.MultipartUploadParams{ + PartSize: 10, + CompleteURL: "http://complete", + AbortURL: "http://abort", + PartURLs: []string{"http://part1", "http://part2"}, + }, + customPutHeaders: true, + putHeaders: map[string]string{"Content-Type": "image/jpeg"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + apiResponse := &api.Response{ + RemoteObject: api.RemoteObject{ + Timeout: 10, + ID: "id", + GetURL: "http://get", + StoreURL: "http://store", + DeleteURL: "http://delete", + MultipartUpload: test.multipart, + CustomPutHeaders: test.customPutHeaders, + PutHeaders: test.putHeaders, + }, + } + deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) + opts, err := filestore.GetOpts(apiResponse) + require.NoError(t, err) + + require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) + require.WithinDuration(t, deadline, opts.Deadline, time.Second) + require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) + require.Equal(t, apiResponse.RemoteObject.GetURL, opts.RemoteURL) + require.Equal(t, apiResponse.RemoteObject.StoreURL, opts.PresignedPut) + require.Equal(t, apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete) + if test.customPutHeaders { + require.Equal(t, opts.PutHeaders, apiResponse.RemoteObject.PutHeaders) + } else { + require.Equal(t, opts.PutHeaders, map[string]string{"Content-Type": "application/octet-stream"}) + } + + if test.multipart == nil { + require.False(t, opts.IsMultipart()) + require.Empty(t, opts.PresignedCompleteMultipart) + require.Empty(t, opts.PresignedAbortMultipart) + require.Zero(t, opts.PartSize) + require.Empty(t, opts.PresignedParts) + } else { + require.True(t, opts.IsMultipart()) + require.Equal(t, test.multipart.CompleteURL, opts.PresignedCompleteMultipart) + require.Equal(t, test.multipart.AbortURL, opts.PresignedAbortMultipart) + require.Equal(t, test.multipart.PartSize, opts.PartSize) + require.Equal(t, test.multipart.PartURLs, opts.PresignedParts) + } + }) + } +} + +func TestGetOptsFail(t *testing.T) { + testCases := []struct { + desc string + in api.Response + }{ + { + desc: "neither local nor remote", + in: api.Response{}, + }, + { + desc: "both local and remote", + in: api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + _, err := filestore.GetOpts(&tc.in) + require.Error(t, err, "expect input to be rejected") + }) + } +} + +func TestGetOptsDefaultTimeout(t *testing.T) { + deadline := time.Now().Add(filestore.DefaultObjectStoreTimeout) + opts, err := filestore.GetOpts(&api.Response{TempPath: "/foo/bar"}) + require.NoError(t, err) + + require.WithinDuration(t, deadline, opts.Deadline, time.Minute) +} + +func TestUseWorkhorseClientEnabled(t *testing.T) { + cfg := filestore.ObjectStorageConfig{ + Provider: "AWS", + S3Config: config.S3Config{ + Bucket: "test-bucket", + Region: "test-region", + }, + S3Credentials: config.S3Credentials{ + AwsAccessKeyID: "test-key", + AwsSecretAccessKey: "test-secret", + }, + } + + missingCfg := cfg + missingCfg.S3Credentials = config.S3Credentials{} + + iamConfig := missingCfg + iamConfig.S3Config.UseIamProfile = true + + tests := []struct { + name string + UseWorkhorseClient bool + remoteTempObjectID string + objectStorageConfig filestore.ObjectStorageConfig + expected bool + }{ + { + name: "all direct access settings used", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: cfg, + expected: true, + }, + { + name: "missing AWS credentials", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: missingCfg, + expected: false, + }, + { + name: "direct access disabled", + UseWorkhorseClient: false, + remoteTempObjectID: "test-object", + objectStorageConfig: cfg, + expected: false, + }, + { + name: "with IAM instance profile", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: iamConfig, + expected: true, + }, + { + name: "missing remote temp object ID", + UseWorkhorseClient: true, + remoteTempObjectID: "", + objectStorageConfig: cfg, + expected: false, + }, + { + name: "missing S3 config", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + expected: false, + }, + { + name: "missing S3 bucket", + UseWorkhorseClient: true, + remoteTempObjectID: "test-object", + objectStorageConfig: filestore.ObjectStorageConfig{ + Provider: "AWS", + S3Config: config.S3Config{}, + }, + expected: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + apiResponse := &api.Response{ + RemoteObject: api.RemoteObject{ + Timeout: 10, + ID: "id", + UseWorkhorseClient: test.UseWorkhorseClient, + RemoteTempObjectID: test.remoteTempObjectID, + }, + } + deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) + opts, err := filestore.GetOpts(apiResponse) + require.NoError(t, err) + opts.ObjectStorageConfig = test.objectStorageConfig + + require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) + require.WithinDuration(t, deadline, opts.Deadline, time.Second) + require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) + require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient) + require.Equal(t, test.expected, opts.UseWorkhorseClientEnabled()) + }) + } +} + +func TestGoCloudConfig(t *testing.T) { + mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azblob") + defer cleanup() + + tests := []struct { + name string + provider string + url string + valid bool + }{ + { + name: "valid AzureRM config", + provider: "AzureRM", + url: "azblob:://test-container", + valid: true, + }, + { + name: "invalid GoCloud scheme", + provider: "AzureRM", + url: "unknown:://test-container", + valid: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + apiResponse := &api.Response{ + RemoteObject: api.RemoteObject{ + Timeout: 10, + ID: "id", + UseWorkhorseClient: true, + RemoteTempObjectID: "test-object", + ObjectStorage: &api.ObjectStorageParams{ + Provider: test.provider, + GoCloudConfig: config.GoCloudConfig{ + URL: test.url, + }, + }, + }, + } + deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) + opts, err := filestore.GetOpts(apiResponse) + require.NoError(t, err) + opts.ObjectStorageConfig.URLMux = mux + + require.Equal(t, apiResponse.TempPath, opts.LocalTempPath) + require.Equal(t, apiResponse.RemoteObject.RemoteTempObjectID, opts.RemoteTempObjectID) + require.WithinDuration(t, deadline, opts.Deadline, time.Second) + require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) + require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient) + require.Equal(t, test.provider, opts.ObjectStorageConfig.Provider) + require.Equal(t, apiResponse.RemoteObject.ObjectStorage.GoCloudConfig, opts.ObjectStorageConfig.GoCloudConfig) + require.True(t, opts.UseWorkhorseClientEnabled()) + require.Equal(t, test.valid, opts.ObjectStorageConfig.IsValid()) + require.False(t, opts.IsLocal()) + }) + } +} |