Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/upload/destination')
-rw-r--r--workhorse/internal/upload/destination/destination.go10
-rw-r--r--workhorse/internal/upload/destination/destination_test.go148
-rw-r--r--workhorse/internal/upload/destination/filestore/filestore.go4
-rw-r--r--workhorse/internal/upload/destination/multi_hash.go39
-rw-r--r--workhorse/internal/upload/destination/multi_hash_test.go52
-rw-r--r--workhorse/internal/upload/destination/objectstore/gocloud_object_test.go11
-rw-r--r--workhorse/internal/upload/destination/objectstore/multipart.go10
-rw-r--r--workhorse/internal/upload/destination/objectstore/multipart_test.go5
-rw-r--r--workhorse/internal/upload/destination/objectstore/object_test.go11
-rw-r--r--workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go38
-rw-r--r--workhorse/internal/upload/destination/objectstore/s3_object_test.go11
-rw-r--r--workhorse/internal/upload/destination/objectstore/s3api/s3api.go37
-rw-r--r--workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go10
-rw-r--r--workhorse/internal/upload/destination/objectstore/uploader.go34
-rw-r--r--workhorse/internal/upload/destination/upload_opts.go26
-rw-r--r--workhorse/internal/upload/destination/upload_opts_test.go25
16 files changed, 289 insertions, 182 deletions
diff --git a/workhorse/internal/upload/destination/destination.go b/workhorse/internal/upload/destination/destination.go
index 5e145e2cb2a..a9fb81540d5 100644
--- a/workhorse/internal/upload/destination/destination.go
+++ b/workhorse/internal/upload/destination/destination.go
@@ -108,6 +108,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) (map[string]string, e
type consumer interface {
Consume(context.Context, io.Reader, time.Time) (int64, error)
+ ConsumeWithoutDelete(context.Context, io.Reader, time.Time) (int64, error)
}
// Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
@@ -120,7 +121,7 @@ func Upload(ctx context.Context, reader io.Reader, size int64, name string, opts
}
uploadStartTime := time.Now()
defer func() { fh.uploadDuration = time.Since(uploadStartTime).Seconds() }()
- hashes := newMultiHash()
+ hashes := newMultiHash(opts.UploadHashFunctions)
reader = io.TeeReader(reader, hashes.Writer)
var clientMode string
@@ -185,7 +186,12 @@ func Upload(ctx context.Context, reader io.Reader, size int64, name string, opts
reader = hlr
}
- fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline)
+ if opts.SkipDelete {
+ fh.Size, err = uploadDestination.ConsumeWithoutDelete(ctx, reader, opts.Deadline)
+ } else {
+ fh.Size, err = uploadDestination.Consume(ctx, reader, opts.Deadline)
+ }
+
if err != nil {
if (err == objectstore.ErrNotEnoughParts) || (hlr != nil && hlr.n < 0) {
err = ErrEntityTooLarge
diff --git a/workhorse/internal/upload/destination/destination_test.go b/workhorse/internal/upload/destination/destination_test.go
index 97645be168f..69dd02ca7c2 100644
--- a/workhorse/internal/upload/destination/destination_test.go
+++ b/workhorse/internal/upload/destination/destination_test.go
@@ -1,4 +1,4 @@
-package destination_test
+package destination
import (
"context"
@@ -17,12 +17,11 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
func testDeadline() time.Time {
- return time.Now().Add(destination.DefaultObjectStoreTimeout)
+ return time.Now().Add(DefaultObjectStoreTimeout)
}
func requireFileGetsRemovedAsync(t *testing.T, filePath string) {
@@ -44,10 +43,10 @@ func TestUploadWrongSize(t *testing.T) {
tmpFolder := t.TempDir()
- opts := &destination.UploadOpts{LocalTempPath: tmpFolder}
- fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, "upload", opts)
+ opts := &UploadOpts{LocalTempPath: tmpFolder}
+ fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, "upload", opts)
require.Error(t, err)
- _, isSizeError := err.(destination.SizeError)
+ _, isSizeError := err.(SizeError)
require.True(t, isSizeError, "Should fail with SizeError")
require.Nil(t, fh)
}
@@ -58,10 +57,10 @@ func TestUploadWithKnownSizeExceedLimit(t *testing.T) {
tmpFolder := t.TempDir()
- opts := &destination.UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1}
- fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts)
+ opts := &UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1}
+ fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts)
require.Error(t, err)
- _, isSizeError := err.(destination.SizeError)
+ _, isSizeError := err.(SizeError)
require.True(t, isSizeError, "Should fail with SizeError")
require.Nil(t, fh)
}
@@ -72,9 +71,9 @@ func TestUploadWithUnknownSizeExceedLimit(t *testing.T) {
tmpFolder := t.TempDir()
- opts := &destination.UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1}
- fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), -1, "upload", opts)
- require.Equal(t, err, destination.ErrEntityTooLarge)
+ opts := &UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1}
+ fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), -1, "upload", opts)
+ require.Equal(t, err, ErrEntityTooLarge)
require.Nil(t, fh)
}
@@ -94,7 +93,7 @@ func TestUploadWrongETag(t *testing.T) {
objectURL := ts.URL + test.ObjectPath
- opts := &destination.UploadOpts{
+ opts := &UploadOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PresignedPut: objectURL + "?Signature=ASignature",
@@ -110,7 +109,7 @@ func TestUploadWrongETag(t *testing.T) {
osStub.InitiateMultipartUpload(test.ObjectPath)
}
ctx, cancel := context.WithCancel(context.Background())
- fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts)
+ fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts)
require.Nil(t, fh)
require.Error(t, err)
require.Equal(t, 1, osStub.PutsCnt(), "File not uploaded")
@@ -135,18 +134,22 @@ func TestUpload(t *testing.T) {
tmpFolder := t.TempDir()
tests := []struct {
- name string
- local bool
- remote remote
+ name string
+ local bool
+ remote remote
+ skipDelete bool
}{
{name: "Local only", local: true},
{name: "Remote Single only", remote: remoteSingle},
{name: "Remote Multipart only", remote: remoteMultipart},
+ {name: "Local only With SkipDelete", local: true, skipDelete: true},
+ {name: "Remote Single only With SkipDelete", remote: remoteSingle, skipDelete: true},
+ {name: "Remote Multipart only With SkipDelete", remote: remoteMultipart, skipDelete: true},
}
for _, spec := range tests {
t.Run(spec.name, func(t *testing.T) {
- var opts destination.UploadOpts
+ var opts UploadOpts
var expectedDeletes, expectedPuts int
osStub, ts := test.StartObjectStore()
@@ -184,10 +187,16 @@ func TestUpload(t *testing.T) {
opts.LocalTempPath = tmpFolder
}
+ opts.SkipDelete = spec.skipDelete
+
+ if opts.SkipDelete {
+ expectedDeletes = 0
+ }
+
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
+ fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
require.NoError(t, err)
require.NotNil(t, fh)
@@ -206,11 +215,7 @@ func TestUpload(t *testing.T) {
}
require.Equal(t, test.ObjectSize, fh.Size)
- if destination.FIPSEnabled() {
- require.Empty(t, fh.MD5())
- } else {
- require.Equal(t, test.ObjectMD5, fh.MD5())
- }
+ require.Equal(t, test.ObjectMD5, fh.MD5())
require.Equal(t, test.ObjectSHA256, fh.SHA256())
require.Equal(t, expectedPuts, osStub.PutsCnt(), "ObjectStore PutObject count mismatch")
@@ -255,7 +260,7 @@ func TestUploadWithS3WorkhorseClient(t *testing.T) {
name: "unknown object size with limit",
objectSize: -1,
maxSize: test.ObjectSize - 1,
- expectedErr: destination.ErrEntityTooLarge,
+ expectedErr: ErrEntityTooLarge,
},
}
@@ -269,12 +274,12 @@ func TestUploadWithS3WorkhorseClient(t *testing.T) {
defer cancel()
remoteObject := "tmp/test-file/1"
- opts := destination.UploadOpts{
+ opts := UploadOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
- ObjectStorageConfig: destination.ObjectStorageConfig{
+ ObjectStorageConfig: ObjectStorageConfig{
Provider: "AWS",
S3Credentials: s3Creds,
S3Config: s3Config,
@@ -282,7 +287,7 @@ func TestUploadWithS3WorkhorseClient(t *testing.T) {
MaximumSize: tc.maxSize,
}
- _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, "upload", &opts)
+ _, err := Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, "upload", &opts)
if tc.expectedErr == nil {
require.NoError(t, err)
@@ -302,19 +307,19 @@ func TestUploadWithAzureWorkhorseClient(t *testing.T) {
defer cancel()
remoteObject := "tmp/test-file/1"
- opts := destination.UploadOpts{
+ opts := UploadOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
- ObjectStorageConfig: destination.ObjectStorageConfig{
+ ObjectStorageConfig: ObjectStorageConfig{
Provider: "AzureRM",
URLMux: mux,
GoCloudConfig: config.GoCloudConfig{URL: "azblob://test-container"},
},
}
- _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
+ _, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
require.NoError(t, err)
test.GoCloudObjectExists(t, bucketDir, remoteObject)
@@ -327,48 +332,65 @@ func TestUploadWithUnknownGoCloudScheme(t *testing.T) {
mux := new(blob.URLMux)
remoteObject := "tmp/test-file/1"
- opts := destination.UploadOpts{
+ opts := UploadOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
- ObjectStorageConfig: destination.ObjectStorageConfig{
+ ObjectStorageConfig: ObjectStorageConfig{
Provider: "SomeCloud",
URLMux: mux,
GoCloudConfig: config.GoCloudConfig{URL: "foo://test-container"},
},
}
- _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
+ _, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
require.Error(t, err)
}
func TestUploadMultipartInBodyFailure(t *testing.T) {
- osStub, ts := test.StartObjectStore()
- defer ts.Close()
-
- // this is a broken path because it contains bucket name but no key
- // this is the only way to get an in-body failure from our ObjectStoreStub
- objectPath := "/bucket-but-no-object-key"
- objectURL := ts.URL + objectPath
- opts := destination.UploadOpts{
- RemoteID: "test-file",
- RemoteURL: objectURL,
- PartSize: test.ObjectSize,
- PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"},
- PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature",
- Deadline: testDeadline(),
+ tests := []struct {
+ name string
+ skipDelete bool
+ }{
+ {name: "With skipDelete false", skipDelete: false},
+ {name: "With skipDelete true", skipDelete: true},
}
- osStub.InitiateMultipartUpload(objectPath)
+ for _, spec := range tests {
+ t.Run(spec.name, func(t *testing.T) {
+ osStub, ts := test.StartObjectStore()
+ defer ts.Close()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ // this is a broken path because it contains bucket name but no key
+ // this is the only way to get an in-body failure from our ObjectStoreStub
+ objectPath := "/bucket-but-no-object-key"
+ objectURL := ts.URL + objectPath
+ opts := UploadOpts{
+ RemoteID: "test-file",
+ RemoteURL: objectURL,
+ PartSize: test.ObjectSize,
+ PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"},
+ PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature",
+ PresignedDelete: objectURL + "?Signature=AnotherSignature",
+ Deadline: testDeadline(),
+ SkipDelete: spec.skipDelete,
+ }
- fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
- require.Nil(t, fh)
- require.Error(t, err)
- require.EqualError(t, err, test.MultipartUploadInternalError().Error())
+ osStub.InitiateMultipartUpload(objectPath)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fh, err := Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
+ require.Nil(t, fh)
+ require.Error(t, err)
+ require.EqualError(t, err, test.MultipartUploadInternalError().Error())
+
+ cancel() // this will trigger an async cleanup
+ requireObjectStoreDeletedAsync(t, 1, osStub)
+ })
+ }
}
func TestUploadRemoteFileWithLimit(t *testing.T) {
@@ -405,20 +427,20 @@ func TestUploadRemoteFileWithLimit(t *testing.T) {
testData: test.ObjectContent,
objectSize: -1,
maxSize: test.ObjectSize - 1,
- expectedErr: destination.ErrEntityTooLarge,
+ expectedErr: ErrEntityTooLarge,
},
{
name: "large object with unknown size with limit",
testData: string(make([]byte, 20000)),
objectSize: -1,
maxSize: 19000,
- expectedErr: destination.ErrEntityTooLarge,
+ expectedErr: ErrEntityTooLarge,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
- var opts destination.UploadOpts
+ var opts UploadOpts
for _, remoteType := range remoteTypes {
osStub, ts := test.StartObjectStore()
@@ -454,7 +476,7 @@ func TestUploadRemoteFileWithLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- fh, err := destination.Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, "upload", &opts)
+ fh, err := Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, "upload", &opts)
if tc.expectedErr == nil {
require.NoError(t, err)
@@ -468,7 +490,7 @@ func TestUploadRemoteFileWithLimit(t *testing.T) {
}
}
-func checkFileHandlerWithFields(t *testing.T, fh *destination.FileHandler, fields map[string]string, prefix string) {
+func checkFileHandlerWithFields(t *testing.T, fh *FileHandler, fields map[string]string, prefix string) {
key := func(field string) string {
if prefix == "" {
return field
@@ -482,11 +504,7 @@ func checkFileHandlerWithFields(t *testing.T, fh *destination.FileHandler, field
require.Equal(t, fh.RemoteURL, fields[key("remote_url")])
require.Equal(t, fh.RemoteID, fields[key("remote_id")])
require.Equal(t, strconv.FormatInt(test.ObjectSize, 10), fields[key("size")])
- if destination.FIPSEnabled() {
- require.Empty(t, fields[key("md5")])
- } else {
- require.Equal(t, test.ObjectMD5, fields[key("md5")])
- }
+ require.Equal(t, test.ObjectMD5, fields[key("md5")])
require.Equal(t, test.ObjectSHA1, fields[key("sha1")])
require.Equal(t, test.ObjectSHA256, fields[key("sha256")])
require.Equal(t, test.ObjectSHA512, fields[key("sha512")])
diff --git a/workhorse/internal/upload/destination/filestore/filestore.go b/workhorse/internal/upload/destination/filestore/filestore.go
index 2d88874bf25..6b2d8270b51 100644
--- a/workhorse/internal/upload/destination/filestore/filestore.go
+++ b/workhorse/internal/upload/destination/filestore/filestore.go
@@ -19,3 +19,7 @@ func (lf *LocalFile) Consume(_ context.Context, r io.Reader, _ time.Time) (int64
}
return n, err
}
+
+func (lf *LocalFile) ConsumeWithoutDelete(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) {
+ return lf.Consume(outerCtx, reader, deadLine)
+}
diff --git a/workhorse/internal/upload/destination/multi_hash.go b/workhorse/internal/upload/destination/multi_hash.go
index 8d5bf4424a8..3f8b0cbd903 100644
--- a/workhorse/internal/upload/destination/multi_hash.go
+++ b/workhorse/internal/upload/destination/multi_hash.go
@@ -8,9 +8,6 @@ import (
"encoding/hex"
"hash"
"io"
- "os"
-
- "gitlab.com/gitlab-org/labkit/fips"
)
var hashFactories = map[string](func() hash.Hash){
@@ -20,39 +17,39 @@ var hashFactories = map[string](func() hash.Hash){
"sha512": sha512.New,
}
-var fipsHashFactories = map[string](func() hash.Hash){
- "sha1": sha1.New,
- "sha256": sha256.New,
- "sha512": sha512.New,
-}
-
func factories() map[string](func() hash.Hash) {
- if FIPSEnabled() {
- return fipsHashFactories
- }
-
return hashFactories
}
-func FIPSEnabled() bool {
- if fips.Enabled() {
+type multiHash struct {
+ io.Writer
+ hashes map[string]hash.Hash
+}
+
+func permittedHashFunction(hashFunctions []string, hash string) bool {
+ if len(hashFunctions) == 0 {
return true
}
- return os.Getenv("WORKHORSE_TEST_FIPS_ENABLED") == "1"
-}
+ for _, name := range hashFunctions {
+ if name == hash {
+ return true
+ }
+ }
-type multiHash struct {
- io.Writer
- hashes map[string]hash.Hash
+ return false
}
-func newMultiHash() (m *multiHash) {
+func newMultiHash(hashFunctions []string) (m *multiHash) {
m = &multiHash{}
m.hashes = make(map[string]hash.Hash)
var writers []io.Writer
for hash, hashFactory := range factories() {
+ if !permittedHashFunction(hashFunctions, hash) {
+ continue
+ }
+
writer := hashFactory()
m.hashes[hash] = writer
diff --git a/workhorse/internal/upload/destination/multi_hash_test.go b/workhorse/internal/upload/destination/multi_hash_test.go
new file mode 100644
index 00000000000..9a976f5d25d
--- /dev/null
+++ b/workhorse/internal/upload/destination/multi_hash_test.go
@@ -0,0 +1,52 @@
+package destination
+
+import (
+ "sort"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestNewMultiHash(t *testing.T) {
+ tests := []struct {
+ name string
+ allowedHashes []string
+ expectedHashes []string
+ }{
+ {
+ name: "default",
+ allowedHashes: nil,
+ expectedHashes: []string{"md5", "sha1", "sha256", "sha512"},
+ },
+ {
+ name: "blank",
+ allowedHashes: []string{},
+ expectedHashes: []string{"md5", "sha1", "sha256", "sha512"},
+ },
+ {
+ name: "no MD5",
+ allowedHashes: []string{"sha1", "sha256", "sha512"},
+ expectedHashes: []string{"sha1", "sha256", "sha512"},
+ },
+
+ {
+ name: "unlisted hash",
+ allowedHashes: []string{"sha1", "sha256", "sha512", "sha3-256"},
+ expectedHashes: []string{"sha1", "sha256", "sha512"},
+ },
+ }
+
+ for _, test := range tests {
+ mh := newMultiHash(test.allowedHashes)
+
+ require.Equal(t, len(test.expectedHashes), len(mh.hashes))
+
+ var keys []string
+ for key := range mh.hashes {
+ keys = append(keys, key)
+ }
+
+ sort.Strings(keys)
+ require.Equal(t, test.expectedHashes, keys)
+ }
+}
diff --git a/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go b/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go
index 55d886087be..5a6a4b90b34 100644
--- a/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go
+++ b/workhorse/internal/upload/destination/objectstore/gocloud_object_test.go
@@ -1,4 +1,4 @@
-package objectstore_test
+package objectstore
import (
"context"
@@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
@@ -22,8 +21,8 @@ func TestGoCloudObjectUpload(t *testing.T) {
objectName := "test.png"
testURL := "azuretest://azure.example.com/test-container"
- p := &objectstore.GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName}
- object, err := objectstore.NewGoCloudObject(p)
+ p := &GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName}
+ object, err := NewGoCloudObject(p)
require.NotNil(t, object)
require.NoError(t, err)
@@ -48,8 +47,8 @@ func TestGoCloudObjectUpload(t *testing.T) {
if exists {
return fmt.Errorf("file %s is still present", objectName)
- } else {
- return nil
}
+
+ return nil
})
}
diff --git a/workhorse/internal/upload/destination/objectstore/multipart.go b/workhorse/internal/upload/destination/objectstore/multipart.go
index df336d2d901..900ca040dad 100644
--- a/workhorse/internal/upload/destination/objectstore/multipart.go
+++ b/workhorse/internal/upload/destination/objectstore/multipart.go
@@ -11,6 +11,8 @@ import (
"os"
"gitlab.com/gitlab-org/labkit/mask"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/s3api"
)
// ErrNotEnoughParts will be used when writing more than size * len(partURLs)
@@ -51,7 +53,7 @@ func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, pu
}
func (m *Multipart) Upload(ctx context.Context, r io.Reader) error {
- cmu := &CompleteMultipartUpload{}
+ cmu := &s3api.CompleteMultipartUpload{}
for i, partURL := range m.PartURLs {
src := io.LimitReader(r, m.partSize)
part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1)
@@ -91,7 +93,7 @@ func (m *Multipart) Delete() {
deleteURL(m.DeleteURL)
}
-func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
+func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*s3api.CompleteMultipartUploadPart, error) {
file, err := os.CreateTemp("", "part-buffer")
if err != nil {
return nil, fmt.Errorf("create temporary buffer file: %v", err)
@@ -118,7 +120,7 @@ func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, pu
if err != nil {
return nil, fmt.Errorf("upload part %d: %v", partNumber, err)
}
- return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
+ return &s3api.CompleteMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
}
func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) {
@@ -142,7 +144,7 @@ func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[stri
return part.ETag(), nil
}
-func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error {
+func (m *Multipart) complete(ctx context.Context, cmu *s3api.CompleteMultipartUpload) error {
body, err := xml.Marshal(cmu)
if err != nil {
return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err)
diff --git a/workhorse/internal/upload/destination/objectstore/multipart_test.go b/workhorse/internal/upload/destination/objectstore/multipart_test.go
index 2a5161e42e7..00244a5c50b 100644
--- a/workhorse/internal/upload/destination/objectstore/multipart_test.go
+++ b/workhorse/internal/upload/destination/objectstore/multipart_test.go
@@ -1,4 +1,4 @@
-package objectstore_test
+package objectstore
import (
"context"
@@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
@@ -48,7 +47,7 @@ func TestMultipartUploadWithUpcaseETags(t *testing.T) {
deadline := time.Now().Add(testTimeout)
- m, err := objectstore.NewMultipart(
+ m, err := NewMultipart(
[]string{ts.URL}, // a single presigned part URL
ts.URL, // the complete multipart upload URL
"", // no abort
diff --git a/workhorse/internal/upload/destination/objectstore/object_test.go b/workhorse/internal/upload/destination/objectstore/object_test.go
index 24117891b6d..2b94cd9e3b1 100644
--- a/workhorse/internal/upload/destination/objectstore/object_test.go
+++ b/workhorse/internal/upload/destination/objectstore/object_test.go
@@ -1,4 +1,4 @@
-package objectstore_test
+package objectstore
import (
"context"
@@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
@@ -35,7 +34,7 @@ func testObjectUploadNoErrors(t *testing.T, startObjectStore osFactory, useDelet
defer cancel()
deadline := time.Now().Add(testTimeout)
- object, err := objectstore.NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize)
+ object, err := NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize)
require.NoError(t, err)
// copy data
@@ -97,12 +96,12 @@ func TestObjectUpload404(t *testing.T) {
deadline := time.Now().Add(testTimeout)
objectURL := ts.URL + test.ObjectPath
- object, err := objectstore.NewObject(objectURL, "", map[string]string{}, test.ObjectSize)
+ object, err := NewObject(objectURL, "", map[string]string{}, test.ObjectSize)
require.NoError(t, err)
_, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
require.Error(t, err)
- _, isStatusCodeError := err.(objectstore.StatusCodeError)
+ _, isStatusCodeError := err.(StatusCodeError)
require.True(t, isStatusCodeError, "Should fail with StatusCodeError")
require.Contains(t, err.Error(), "404")
}
@@ -140,7 +139,7 @@ func TestObjectUploadBrokenConnection(t *testing.T) {
deadline := time.Now().Add(testTimeout)
objectURL := ts.URL + test.ObjectPath
- object, err := objectstore.NewObject(objectURL, "", map[string]string{}, -1)
+ object, err := NewObject(objectURL, "", map[string]string{}, -1)
require.NoError(t, err)
_, copyErr := object.Consume(ctx, &endlessReader{}, deadline)
diff --git a/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go b/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go
index b84f5757f49..02799d0b9b0 100644
--- a/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go
+++ b/workhorse/internal/upload/destination/objectstore/s3_complete_multipart_api.go
@@ -2,45 +2,15 @@ package objectstore
import (
"encoding/xml"
- "fmt"
-)
-
-// CompleteMultipartUpload is the S3 CompleteMultipartUpload body
-type CompleteMultipartUpload struct {
- Part []*completeMultipartUploadPart
-}
-type completeMultipartUploadPart struct {
- PartNumber int
- ETag string
-}
-
-// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request
-type CompleteMultipartUploadResult struct {
- Location string
- Bucket string
- Key string
- ETag string
-}
-
-// CompleteMultipartUploadError is the in-body error structure
-// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples
-// the answer contains other fields we are not using
-type CompleteMultipartUploadError struct {
- XMLName xml.Name `xml:"Error"`
- Code string
- Message string
-}
-
-func (c *CompleteMultipartUploadError) Error() string {
- return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message)
-}
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/s3api"
+)
// compoundCompleteMultipartUploadResult holds both CompleteMultipartUploadResult and CompleteMultipartUploadError
// this allow us to deserialize the response body where the root element can either be Error orCompleteMultipartUploadResult
type compoundCompleteMultipartUploadResult struct {
- *CompleteMultipartUploadResult
- *CompleteMultipartUploadError
+ *s3api.CompleteMultipartUploadResult
+ *s3api.CompleteMultipartUploadError
// XMLName this overrides CompleteMultipartUploadError.XMLName tags
XMLName xml.Name
diff --git a/workhorse/internal/upload/destination/objectstore/s3_object_test.go b/workhorse/internal/upload/destination/objectstore/s3_object_test.go
index 0ed14a2e844..c99712d18ad 100644
--- a/workhorse/internal/upload/destination/objectstore/s3_object_test.go
+++ b/workhorse/internal/upload/destination/objectstore/s3_object_test.go
@@ -1,4 +1,4 @@
-package objectstore_test
+package objectstore
import (
"context"
@@ -17,7 +17,6 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
@@ -50,7 +49,7 @@ func TestS3ObjectUpload(t *testing.T) {
objectName := filepath.Join(tmpDir, "s3-test-data")
ctx, cancel := context.WithCancel(context.Background())
- object, err := objectstore.NewS3Object(objectName, creds, config)
+ object, err := NewS3Object(objectName, creds, config)
require.NoError(t, err)
// copy data
@@ -107,7 +106,7 @@ func TestConcurrentS3ObjectUpload(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- object, err := objectstore.NewS3Object(objectName, creds, config)
+ object, err := NewS3Object(objectName, creds, config)
require.NoError(t, err)
// copy data
@@ -134,7 +133,7 @@ func TestS3ObjectUploadCancel(t *testing.T) {
objectName := filepath.Join(tmpDir, "s3-test-data")
- object, err := objectstore.NewS3Object(objectName, creds, config)
+ object, err := NewS3Object(objectName, creds, config)
require.NoError(t, err)
@@ -155,7 +154,7 @@ func TestS3ObjectUploadLimitReached(t *testing.T) {
tmpDir := t.TempDir()
objectName := filepath.Join(tmpDir, "s3-test-data")
- object, err := objectstore.NewS3Object(objectName, creds, config)
+ object, err := NewS3Object(objectName, creds, config)
require.NoError(t, err)
_, err = object.Consume(context.Background(), &failedReader{}, deadline)
diff --git a/workhorse/internal/upload/destination/objectstore/s3api/s3api.go b/workhorse/internal/upload/destination/objectstore/s3api/s3api.go
new file mode 100644
index 00000000000..49ab9347911
--- /dev/null
+++ b/workhorse/internal/upload/destination/objectstore/s3api/s3api.go
@@ -0,0 +1,37 @@
+package s3api
+
+import (
+ "encoding/xml"
+ "fmt"
+)
+
+// CompleteMultipartUploadError is the in-body error structure
+// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples
+// the answer contains other fields we are not using
+type CompleteMultipartUploadError struct {
+ XMLName xml.Name `xml:"Error"`
+ Code string
+ Message string
+}
+
+func (c *CompleteMultipartUploadError) Error() string {
+ return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message)
+}
+
+// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request
+type CompleteMultipartUploadResult struct {
+ Location string
+ Bucket string
+ Key string
+ ETag string
+}
+
+// CompleteMultipartUpload is the S3 CompleteMultipartUpload body
+type CompleteMultipartUpload struct {
+ Part []*CompleteMultipartUploadPart
+}
+
+type CompleteMultipartUploadPart struct {
+ PartNumber int
+ ETag string
+}
diff --git a/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go
index 1a380bd5083..8fbb746d6ce 100644
--- a/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go
+++ b/workhorse/internal/upload/destination/objectstore/test/objectstore_stub.go
@@ -12,7 +12,7 @@ import (
"strings"
"sync"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/s3api"
)
type partsEtagMap map[int]string
@@ -190,8 +190,8 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
-func MultipartUploadInternalError() *objectstore.CompleteMultipartUploadError {
- return &objectstore.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"}
+func MultipartUploadInternalError() *s3api.CompleteMultipartUploadError {
+ return &s3api.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"}
}
func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http.Request) {
@@ -212,7 +212,7 @@ func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http
return
}
- var msg objectstore.CompleteMultipartUpload
+ var msg s3api.CompleteMultipartUpload
err = xml.Unmarshal(buf, &msg)
if err != nil {
http.Error(w, err.Error(), 400)
@@ -245,7 +245,7 @@ func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http
bucket := split[0]
key := split[1]
- answer := objectstore.CompleteMultipartUploadResult{
+ answer := s3api.CompleteMultipartUploadResult{
Location: r.URL.String(),
Bucket: bucket,
Key: key,
diff --git a/workhorse/internal/upload/destination/objectstore/uploader.go b/workhorse/internal/upload/destination/objectstore/uploader.go
index 43e573872ee..798a693aa93 100644
--- a/workhorse/internal/upload/destination/objectstore/uploader.go
+++ b/workhorse/internal/upload/destination/objectstore/uploader.go
@@ -38,11 +38,21 @@ func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader {
func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) }
+func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) {
+ return u.consume(outerCtx, reader, deadLine, false)
+}
+
+func (u *uploader) ConsumeWithoutDelete(outerCtx context.Context, reader io.Reader, deadLine time.Time) (_ int64, err error) {
+ return u.consume(outerCtx, reader, deadLine, true)
+}
+
// Consume reads the reader until it reaches EOF or an error. It spawns a
// goroutine that waits for outerCtx to be done, after which the remote
// file is deleted. The deadline applies to the upload performed inside
// Consume, not to outerCtx.
-func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) {
+// SkipDelete optionaly call the Delete() function on the strategy once
+// rails is done handling the upload request.
+func (u *uploader) consume(outerCtx context.Context, reader io.Reader, deadLine time.Time, skipDelete bool) (_ int64, err error) {
if u.metrics {
objectStorageUploadsOpen.Inc()
defer func(started time.Time) {
@@ -59,17 +69,25 @@ func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline
// "delete" them.
if err != nil {
u.strategy.Abort()
+
+ if skipDelete {
+ // skipDelete avoided the object removal (see the goroutine below). Make
+ // here that the object is deleted if aborted.
+ u.strategy.Delete()
+ }
}
}()
- go func() {
- // Once gitlab-rails is done handling the request, we are supposed to
- // delete the upload from its temporary location.
- <-outerCtx.Done()
- u.strategy.Delete()
- }()
+ if !skipDelete {
+ go func() {
+ // Once gitlab-rails is done handling the request, we are supposed to
+ // delete the upload from its temporary location.
+ <-outerCtx.Done()
+ u.strategy.Delete()
+ }()
+ }
- uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline)
+ uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadLine)
defer cancelFn()
var hasher hash.Hash
diff --git a/workhorse/internal/upload/destination/upload_opts.go b/workhorse/internal/upload/destination/upload_opts.go
index 58427b38b30..72efaebc16c 100644
--- a/workhorse/internal/upload/destination/upload_opts.go
+++ b/workhorse/internal/upload/destination/upload_opts.go
@@ -39,6 +39,8 @@ type UploadOpts struct {
PresignedPut string
// PresignedDelete is a presigned S3 DeleteObject compatible URL.
PresignedDelete string
+ // Whether Workhorse needs to delete the temporary object or not.
+ SkipDelete bool
// HTTP headers to be sent along with PUT request
PutHeaders map[string]string
// Whether to ignore Rails pre-signed URLs and have Workhorse directly access object storage provider
@@ -61,6 +63,8 @@ type UploadOpts struct {
PresignedCompleteMultipart string
// PresignedAbortMultipart is a presigned URL for AbortMultipartUpload
PresignedAbortMultipart string
+ // UploadHashFunctions contains a list of allowed hash functions (md5, sha1, etc.)
+ UploadHashFunctions []string
}
// UseWorkhorseClientEnabled checks if the options require direct access to object storage
@@ -90,16 +94,18 @@ func GetOpts(apiResponse *api.Response) (*UploadOpts, error) {
}
opts := UploadOpts{
- LocalTempPath: apiResponse.TempPath,
- RemoteID: apiResponse.RemoteObject.ID,
- RemoteURL: apiResponse.RemoteObject.GetURL,
- PresignedPut: apiResponse.RemoteObject.StoreURL,
- PresignedDelete: apiResponse.RemoteObject.DeleteURL,
- PutHeaders: apiResponse.RemoteObject.PutHeaders,
- UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient,
- RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID,
- Deadline: time.Now().Add(timeout),
- MaximumSize: apiResponse.MaximumSize,
+ LocalTempPath: apiResponse.TempPath,
+ RemoteID: apiResponse.RemoteObject.ID,
+ RemoteURL: apiResponse.RemoteObject.GetURL,
+ PresignedPut: apiResponse.RemoteObject.StoreURL,
+ PresignedDelete: apiResponse.RemoteObject.DeleteURL,
+ SkipDelete: apiResponse.RemoteObject.SkipDelete,
+ PutHeaders: apiResponse.RemoteObject.PutHeaders,
+ UseWorkhorseClient: apiResponse.RemoteObject.UseWorkhorseClient,
+ RemoteTempObjectID: apiResponse.RemoteObject.RemoteTempObjectID,
+ Deadline: time.Now().Add(timeout),
+ MaximumSize: apiResponse.MaximumSize,
+ UploadHashFunctions: apiResponse.UploadHashFunctions,
}
if opts.LocalTempPath != "" && opts.RemoteID != "" {
diff --git a/workhorse/internal/upload/destination/upload_opts_test.go b/workhorse/internal/upload/destination/upload_opts_test.go
index fd9e56db194..0fda3bd2381 100644
--- a/workhorse/internal/upload/destination/upload_opts_test.go
+++ b/workhorse/internal/upload/destination/upload_opts_test.go
@@ -1,4 +1,4 @@
-package destination_test
+package destination
import (
"testing"
@@ -8,7 +8,6 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload/destination/objectstore/test"
)
@@ -43,7 +42,7 @@ func TestUploadOptsLocalAndRemote(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- opts := destination.UploadOpts{
+ opts := UploadOpts{
LocalTempPath: test.localTempPath,
PresignedPut: test.presignedPut,
PartSize: test.partSize,
@@ -103,15 +102,17 @@ func TestGetOpts(t *testing.T) {
MultipartUpload: test.multipart,
CustomPutHeaders: test.customPutHeaders,
PutHeaders: test.putHeaders,
+ SkipDelete: true,
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
- opts, err := destination.GetOpts(apiResponse)
+ opts, err := GetOpts(apiResponse)
require.NoError(t, err)
require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
require.WithinDuration(t, deadline, opts.Deadline, time.Second)
require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
+ require.Equal(t, apiResponse.RemoteObject.SkipDelete, opts.SkipDelete)
require.Equal(t, apiResponse.RemoteObject.GetURL, opts.RemoteURL)
require.Equal(t, apiResponse.RemoteObject.StoreURL, opts.PresignedPut)
require.Equal(t, apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete)
@@ -155,22 +156,22 @@ func TestGetOptsFail(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- _, err := destination.GetOpts(tc.in)
+ _, err := GetOpts(tc.in)
require.Error(t, err, "expect input to be rejected")
})
}
}
func TestGetOptsDefaultTimeout(t *testing.T) {
- deadline := time.Now().Add(destination.DefaultObjectStoreTimeout)
- opts, err := destination.GetOpts(&api.Response{TempPath: "/foo/bar"})
+ deadline := time.Now().Add(DefaultObjectStoreTimeout)
+ opts, err := GetOpts(&api.Response{TempPath: "/foo/bar"})
require.NoError(t, err)
require.WithinDuration(t, deadline, opts.Deadline, time.Minute)
}
func TestUseWorkhorseClientEnabled(t *testing.T) {
- cfg := destination.ObjectStorageConfig{
+ cfg := ObjectStorageConfig{
Provider: "AWS",
S3Config: config.S3Config{
Bucket: "test-bucket",
@@ -195,7 +196,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
name string
UseWorkhorseClient bool
remoteTempObjectID string
- objectStorageConfig destination.ObjectStorageConfig
+ objectStorageConfig ObjectStorageConfig
expected bool
}{
{
@@ -243,7 +244,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
name: "missing S3 bucket",
UseWorkhorseClient: true,
remoteTempObjectID: "test-object",
- objectStorageConfig: destination.ObjectStorageConfig{
+ objectStorageConfig: ObjectStorageConfig{
Provider: "AWS",
S3Config: config.S3Config{},
},
@@ -269,7 +270,7 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
- opts, err := destination.GetOpts(apiResponse)
+ opts, err := GetOpts(apiResponse)
require.NoError(t, err)
opts.ObjectStorageConfig = test.objectStorageConfig
@@ -322,7 +323,7 @@ func TestGoCloudConfig(t *testing.T) {
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
- opts, err := destination.GetOpts(apiResponse)
+ opts, err := GetOpts(apiResponse)
require.NoError(t, err)
opts.ObjectStorageConfig.URLMux = mux