Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/objectstore')
-rw-r--r--workhorse/internal/objectstore/gocloud_object.go100
-rw-r--r--workhorse/internal/objectstore/gocloud_object_test.go56
-rw-r--r--workhorse/internal/objectstore/multipart.go188
-rw-r--r--workhorse/internal/objectstore/multipart_test.go64
-rw-r--r--workhorse/internal/objectstore/object.go114
-rw-r--r--workhorse/internal/objectstore/object_test.go155
-rw-r--r--workhorse/internal/objectstore/prometheus.go39
-rw-r--r--workhorse/internal/objectstore/s3_complete_multipart_api.go51
-rw-r--r--workhorse/internal/objectstore/s3_object.go119
-rw-r--r--workhorse/internal/objectstore/s3_object_test.go174
-rw-r--r--workhorse/internal/objectstore/s3_session.go94
-rw-r--r--workhorse/internal/objectstore/s3_session_test.go57
-rw-r--r--workhorse/internal/objectstore/test/consts.go19
-rw-r--r--workhorse/internal/objectstore/test/gocloud_stub.go47
-rw-r--r--workhorse/internal/objectstore/test/objectstore_stub.go278
-rw-r--r--workhorse/internal/objectstore/test/objectstore_stub_test.go167
-rw-r--r--workhorse/internal/objectstore/test/s3_stub.go142
-rw-r--r--workhorse/internal/objectstore/upload_strategy.go46
-rw-r--r--workhorse/internal/objectstore/uploader.go115
19 files changed, 2025 insertions, 0 deletions
diff --git a/workhorse/internal/objectstore/gocloud_object.go b/workhorse/internal/objectstore/gocloud_object.go
new file mode 100644
index 00000000000..38545086994
--- /dev/null
+++ b/workhorse/internal/objectstore/gocloud_object.go
@@ -0,0 +1,100 @@
+package objectstore
+
+import (
+ "context"
+ "io"
+ "time"
+
+ "gitlab.com/gitlab-org/labkit/log"
+ "gocloud.dev/blob"
+ "gocloud.dev/gcerrors"
+)
+
+type GoCloudObject struct {
+ bucket *blob.Bucket
+ mux *blob.URLMux
+ bucketURL string
+ objectName string
+ *uploader
+}
+
+type GoCloudObjectParams struct {
+ Ctx context.Context
+ Mux *blob.URLMux
+ BucketURL string
+ ObjectName string
+}
+
+func NewGoCloudObject(p *GoCloudObjectParams) (*GoCloudObject, error) {
+ bucket, err := p.Mux.OpenBucket(p.Ctx, p.BucketURL)
+ if err != nil {
+ return nil, err
+ }
+
+ o := &GoCloudObject{
+ bucket: bucket,
+ mux: p.Mux,
+ bucketURL: p.BucketURL,
+ objectName: p.ObjectName,
+ }
+
+ o.uploader = newUploader(o)
+ return o, nil
+}
+
+func (o *GoCloudObject) Upload(ctx context.Context, r io.Reader) error {
+ defer o.bucket.Close()
+
+ writer, err := o.bucket.NewWriter(ctx, o.objectName, nil)
+ if err != nil {
+ log.ContextLogger(ctx).WithError(err).Error("error creating GoCloud bucket")
+ return err
+ }
+
+ if _, err = io.Copy(writer, r); err != nil {
+ log.ContextLogger(ctx).WithError(err).Error("error writing to GoCloud bucket")
+ writer.Close()
+ return err
+ }
+
+ if err := writer.Close(); err != nil {
+ log.ContextLogger(ctx).WithError(err).Error("error closing GoCloud bucket")
+ return err
+ }
+
+ return nil
+}
+
+func (o *GoCloudObject) ETag() string {
+ return ""
+}
+
+func (o *GoCloudObject) Abort() {
+ o.Delete()
+}
+
+// Delete will always attempt to delete the temporary file.
+// According to https://github.com/google/go-cloud/blob/7818961b5c9a112f7e092d3a2d8479cbca80d187/blob/azureblob/azureblob.go#L881-L883,
+// if the writer is closed before any Write is called, Close will create an empty file.
+func (o *GoCloudObject) Delete() {
+ if o.bucketURL == "" || o.objectName == "" {
+ return
+ }
+
+ // Note we can't use the request context because in a successful
+ // case, the original request has already completed.
+ deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
+ defer cancel()
+
+ bucket, err := o.mux.OpenBucket(deleteCtx, o.bucketURL)
+ if err != nil {
+ log.WithError(err).Error("error opening bucket for delete")
+ return
+ }
+
+ if err := bucket.Delete(deleteCtx, o.objectName); err != nil {
+ if gcerrors.Code(err) != gcerrors.NotFound {
+ log.WithError(err).Error("error deleting object")
+ }
+ }
+}
diff --git a/workhorse/internal/objectstore/gocloud_object_test.go b/workhorse/internal/objectstore/gocloud_object_test.go
new file mode 100644
index 00000000000..4dc9d2d75cc
--- /dev/null
+++ b/workhorse/internal/objectstore/gocloud_object_test.go
@@ -0,0 +1,56 @@
+package objectstore_test
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
+)
+
+func TestGoCloudObjectUpload(t *testing.T) {
+ mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azuretest")
+ defer cleanup()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ deadline := time.Now().Add(testTimeout)
+
+ objectName := "test.png"
+ testURL := "azuretest://azure.example.com/test-container"
+ p := &objectstore.GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName}
+ object, err := objectstore.NewGoCloudObject(p)
+ require.NotNil(t, object)
+ require.NoError(t, err)
+
+ // copy data
+ n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
+
+ bucket, err := mux.OpenBucket(ctx, testURL)
+ require.NoError(t, err)
+
+ // Verify the data was copied correctly.
+ received, err := bucket.ReadAll(ctx, objectName)
+ require.NoError(t, err)
+ require.Equal(t, []byte(test.ObjectContent), received)
+
+ cancel()
+
+ testhelper.Retry(t, 5*time.Second, func() error {
+ exists, err := bucket.Exists(ctx, objectName)
+ require.NoError(t, err)
+
+ if exists {
+ return fmt.Errorf("file %s is still present", objectName)
+ } else {
+ return nil
+ }
+ })
+}
diff --git a/workhorse/internal/objectstore/multipart.go b/workhorse/internal/objectstore/multipart.go
new file mode 100644
index 00000000000..fd1c0ed487d
--- /dev/null
+++ b/workhorse/internal/objectstore/multipart.go
@@ -0,0 +1,188 @@
+package objectstore
+
+import (
+ "bytes"
+ "context"
+ "encoding/xml"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "os"
+
+ "gitlab.com/gitlab-org/labkit/log"
+ "gitlab.com/gitlab-org/labkit/mask"
+)
+
+// ErrNotEnoughParts will be used when writing more than size * len(partURLs)
+var ErrNotEnoughParts = errors.New("not enough Parts")
+
+// Multipart represents a MultipartUpload on a S3 compatible Object Store service.
+// It can be used as io.WriteCloser for uploading an object
+type Multipart struct {
+ PartURLs []string
+ // CompleteURL is a presigned URL for CompleteMultipartUpload
+ CompleteURL string
+ // AbortURL is a presigned URL for AbortMultipartUpload
+ AbortURL string
+ // DeleteURL is a presigned URL for RemoveObject
+ DeleteURL string
+ PutHeaders map[string]string
+ partSize int64
+ etag string
+
+ *uploader
+}
+
+// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes
+// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
+// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
+func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, partSize int64) (*Multipart, error) {
+ m := &Multipart{
+ PartURLs: partURLs,
+ CompleteURL: completeURL,
+ AbortURL: abortURL,
+ DeleteURL: deleteURL,
+ PutHeaders: putHeaders,
+ partSize: partSize,
+ }
+
+ m.uploader = newUploader(m)
+ return m, nil
+}
+
+func (m *Multipart) Upload(ctx context.Context, r io.Reader) error {
+ cmu := &CompleteMultipartUpload{}
+ for i, partURL := range m.PartURLs {
+ src := io.LimitReader(r, m.partSize)
+ part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1)
+ if err != nil {
+ return err
+ }
+ if part == nil {
+ break
+ } else {
+ cmu.Part = append(cmu.Part, part)
+ }
+ }
+
+ n, err := io.Copy(ioutil.Discard, r)
+ if err != nil {
+ return fmt.Errorf("drain pipe: %v", err)
+ }
+ if n > 0 {
+ return ErrNotEnoughParts
+ }
+
+ if err := m.complete(ctx, cmu); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *Multipart) ETag() string {
+ return m.etag
+}
+func (m *Multipart) Abort() {
+ deleteURL(m.AbortURL)
+}
+
+func (m *Multipart) Delete() {
+ deleteURL(m.DeleteURL)
+}
+
+func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
+ file, err := ioutil.TempFile("", "part-buffer")
+ if err != nil {
+ return nil, fmt.Errorf("create temporary buffer file: %v", err)
+ }
+ defer func(path string) {
+ if err := os.Remove(path); err != nil {
+ log.WithError(err).WithField("file", path).Warning("Unable to delete temporary file")
+ }
+ }(file.Name())
+
+ n, err := io.Copy(file, src)
+ if err != nil {
+ return nil, err
+ }
+ if n == 0 {
+ return nil, nil
+ }
+
+ if _, err = file.Seek(0, io.SeekStart); err != nil {
+ return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err)
+ }
+
+ etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n)
+ if err != nil {
+ return nil, fmt.Errorf("upload part %d: %v", partNumber, err)
+ }
+ return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
+}
+
+func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) {
+ deadline, ok := ctx.Deadline()
+ if !ok {
+ return "", fmt.Errorf("missing deadline")
+ }
+
+ part, err := newObject(url, "", headers, size, false)
+ if err != nil {
+ return "", err
+ }
+
+ if n, err := part.Consume(ctx, io.LimitReader(body, size), deadline); err != nil || n < size {
+ if err == nil {
+ err = io.ErrUnexpectedEOF
+ }
+ return "", err
+ }
+
+ return part.ETag(), nil
+}
+
+func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error {
+ body, err := xml.Marshal(cmu)
+ if err != nil {
+ return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err)
+ }
+
+ req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body))
+ if err != nil {
+ return fmt.Errorf("create CompleteMultipartUpload request: %v", err)
+ }
+ req.ContentLength = int64(len(body))
+ req.Header.Set("Content-Type", "application/xml")
+ req = req.WithContext(ctx)
+
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status)
+ }
+
+ result := &compoundCompleteMultipartUploadResult{}
+ decoder := xml.NewDecoder(resp.Body)
+ if err := decoder.Decode(&result); err != nil {
+ return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err)
+ }
+
+ if result.isError() {
+ return result
+ }
+
+ if result.CompleteMultipartUploadResult == nil {
+ return fmt.Errorf("empty CompleteMultipartUploadResult")
+ }
+
+ m.etag = extractETag(result.ETag)
+
+ return nil
+}
diff --git a/workhorse/internal/objectstore/multipart_test.go b/workhorse/internal/objectstore/multipart_test.go
new file mode 100644
index 00000000000..00d6efc0982
--- /dev/null
+++ b/workhorse/internal/objectstore/multipart_test.go
@@ -0,0 +1,64 @@
+package objectstore_test
+
+import (
+ "context"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
+)
+
+func TestMultipartUploadWithUpcaseETags(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ var putCnt, postCnt int
+
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ _, err := ioutil.ReadAll(r.Body)
+ require.NoError(t, err)
+ defer r.Body.Close()
+
+ // Part upload request
+ if r.Method == "PUT" {
+ putCnt++
+
+ w.Header().Set("ETag", strings.ToUpper(test.ObjectMD5))
+ }
+
+ // POST with CompleteMultipartUpload request
+ if r.Method == "POST" {
+ completeBody := `<CompleteMultipartUploadResult>
+ <Bucket>test-bucket</Bucket>
+ <ETag>No Longer Checked</ETag>
+ </CompleteMultipartUploadResult>`
+ postCnt++
+
+ w.Write([]byte(completeBody))
+ }
+ }))
+ defer ts.Close()
+
+ deadline := time.Now().Add(testTimeout)
+
+ m, err := objectstore.NewMultipart(
+ []string{ts.URL}, // a single presigned part URL
+ ts.URL, // the complete multipart upload URL
+ "", // no abort
+ "", // no delete
+ map[string]string{}, // no custom headers
+ test.ObjectSize) // parts size equal to the whole content. Only 1 part
+ require.NoError(t, err)
+
+ _, err = m.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, 1, putCnt, "1 part expected")
+ require.Equal(t, 1, postCnt, "1 complete multipart upload expected")
+}
diff --git a/workhorse/internal/objectstore/object.go b/workhorse/internal/objectstore/object.go
new file mode 100644
index 00000000000..eaf3bfb2e36
--- /dev/null
+++ b/workhorse/internal/objectstore/object.go
@@ -0,0 +1,114 @@
+package objectstore
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "time"
+
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "gitlab.com/gitlab-org/labkit/mask"
+ "gitlab.com/gitlab-org/labkit/tracing"
+)
+
+// httpTransport defines a http.Transport with values
+// that are more restrictive than for http.DefaultTransport,
+// they define shorter TLS Handshake, and more aggressive connection closing
+// to prevent the connection hanging and reduce FD usage
+var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ DialContext: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 10 * time.Second,
+ }).DialContext,
+ MaxIdleConns: 2,
+ IdleConnTimeout: 30 * time.Second,
+ TLSHandshakeTimeout: 10 * time.Second,
+ ExpectContinueTimeout: 10 * time.Second,
+ ResponseHeaderTimeout: 30 * time.Second,
+}))
+
+var httpClient = &http.Client{
+ Transport: httpTransport,
+}
+
+// Object represents an object on a S3 compatible Object Store service.
+// It can be used as io.WriteCloser for uploading an object
+type Object struct {
+ // putURL is a presigned URL for PutObject
+ putURL string
+ // deleteURL is a presigned URL for RemoveObject
+ deleteURL string
+ putHeaders map[string]string
+ size int64
+ etag string
+ metrics bool
+
+ *uploader
+}
+
+type StatusCodeError error
+
+// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
+func NewObject(putURL, deleteURL string, putHeaders map[string]string, size int64) (*Object, error) {
+ return newObject(putURL, deleteURL, putHeaders, size, true)
+}
+
+func newObject(putURL, deleteURL string, putHeaders map[string]string, size int64, metrics bool) (*Object, error) {
+ o := &Object{
+ putURL: putURL,
+ deleteURL: deleteURL,
+ putHeaders: putHeaders,
+ size: size,
+ metrics: metrics,
+ }
+
+ o.uploader = newETagCheckUploader(o, metrics)
+ return o, nil
+}
+
+func (o *Object) Upload(ctx context.Context, r io.Reader) error {
+ // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
+ req, err := http.NewRequest(http.MethodPut, o.putURL, ioutil.NopCloser(r))
+
+ if err != nil {
+ return fmt.Errorf("PUT %q: %v", mask.URL(o.putURL), err)
+ }
+ req.ContentLength = o.size
+
+ for k, v := range o.putHeaders {
+ req.Header.Set(k, v)
+ }
+
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("PUT request %q: %v", mask.URL(o.putURL), err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ if o.metrics {
+ objectStorageUploadRequestsInvalidStatus.Inc()
+ }
+ return StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.putURL), resp.Status))
+ }
+
+ o.etag = extractETag(resp.Header.Get("ETag"))
+
+ return nil
+}
+
+func (o *Object) ETag() string {
+ return o.etag
+}
+
+func (o *Object) Abort() {
+ o.Delete()
+}
+
+func (o *Object) Delete() {
+ deleteURL(o.deleteURL)
+}
diff --git a/workhorse/internal/objectstore/object_test.go b/workhorse/internal/objectstore/object_test.go
new file mode 100644
index 00000000000..2ec45520e97
--- /dev/null
+++ b/workhorse/internal/objectstore/object_test.go
@@ -0,0 +1,155 @@
+package objectstore_test
+
+import (
+ "context"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
+)
+
+const testTimeout = 10 * time.Second
+
+type osFactory func() (*test.ObjectstoreStub, *httptest.Server)
+
+func testObjectUploadNoErrors(t *testing.T, startObjectStore osFactory, useDeleteURL bool, contentType string) {
+ osStub, ts := startObjectStore()
+ defer ts.Close()
+
+ objectURL := ts.URL + test.ObjectPath
+ var deleteURL string
+ if useDeleteURL {
+ deleteURL = objectURL
+ }
+
+ putHeaders := map[string]string{"Content-Type": contentType}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ deadline := time.Now().Add(testTimeout)
+ object, err := objectstore.NewObject(objectURL, deleteURL, putHeaders, test.ObjectSize)
+ require.NoError(t, err)
+
+ // copy data
+ n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
+
+ require.Equal(t, contentType, osStub.GetHeader(test.ObjectPath, "Content-Type"))
+
+ // Checking MD5 extraction
+ require.Equal(t, osStub.GetObjectMD5(test.ObjectPath), object.ETag())
+
+ // Checking cleanup
+ cancel()
+ require.Equal(t, 1, osStub.PutsCnt(), "Object hasn't been uploaded")
+
+ var expectedDeleteCnt int
+ if useDeleteURL {
+ expectedDeleteCnt = 1
+ }
+ // Poll because the object removal is async
+ for i := 0; i < 100; i++ {
+ if osStub.DeletesCnt() == expectedDeleteCnt {
+ break
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ if useDeleteURL {
+ require.Equal(t, 1, osStub.DeletesCnt(), "Object hasn't been deleted")
+ } else {
+ require.Equal(t, 0, osStub.DeletesCnt(), "Object has been deleted")
+ }
+}
+
+func TestObjectUpload(t *testing.T) {
+ t.Run("with delete URL", func(t *testing.T) {
+ testObjectUploadNoErrors(t, test.StartObjectStore, true, "application/octet-stream")
+ })
+ t.Run("without delete URL", func(t *testing.T) {
+ testObjectUploadNoErrors(t, test.StartObjectStore, false, "application/octet-stream")
+ })
+ t.Run("with custom content type", func(t *testing.T) {
+ testObjectUploadNoErrors(t, test.StartObjectStore, false, "image/jpeg")
+ })
+ t.Run("with upcase ETAG", func(t *testing.T) {
+ factory := func() (*test.ObjectstoreStub, *httptest.Server) {
+ md5s := map[string]string{
+ test.ObjectPath: strings.ToUpper(test.ObjectMD5),
+ }
+
+ return test.StartObjectStoreWithCustomMD5(md5s)
+ }
+
+ testObjectUploadNoErrors(t, factory, false, "application/octet-stream")
+ })
+}
+
+func TestObjectUpload404(t *testing.T) {
+ ts := httptest.NewServer(http.NotFoundHandler())
+ defer ts.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ deadline := time.Now().Add(testTimeout)
+ objectURL := ts.URL + test.ObjectPath
+ object, err := objectstore.NewObject(objectURL, "", map[string]string{}, test.ObjectSize)
+ require.NoError(t, err)
+ _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+
+ require.Error(t, err)
+ _, isStatusCodeError := err.(objectstore.StatusCodeError)
+ require.True(t, isStatusCodeError, "Should fail with StatusCodeError")
+ require.Contains(t, err.Error(), "404")
+}
+
+type endlessReader struct{}
+
+func (e *endlessReader) Read(p []byte) (n int, err error) {
+ for i := 0; i < len(p); i++ {
+ p[i] = '*'
+ }
+
+ return len(p), nil
+}
+
+// TestObjectUploadBrokenConnection purpose is to ensure that errors caused by the upload destination get propagated back correctly.
+// This is important for troubleshooting in production.
+func TestObjectUploadBrokenConnection(t *testing.T) {
+ // This test server closes connection immediately
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ hj, ok := w.(http.Hijacker)
+ if !ok {
+ require.FailNow(t, "webserver doesn't support hijacking")
+ }
+ conn, _, err := hj.Hijack()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ conn.Close()
+ }))
+ defer ts.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ deadline := time.Now().Add(testTimeout)
+ objectURL := ts.URL + test.ObjectPath
+ object, err := objectstore.NewObject(objectURL, "", map[string]string{}, -1)
+ require.NoError(t, err)
+
+ _, copyErr := object.Consume(ctx, &endlessReader{}, deadline)
+ require.Error(t, copyErr)
+ require.NotEqual(t, io.ErrClosedPipe, copyErr, "We are shadowing the real error")
+}
diff --git a/workhorse/internal/objectstore/prometheus.go b/workhorse/internal/objectstore/prometheus.go
new file mode 100644
index 00000000000..20762fb52bc
--- /dev/null
+++ b/workhorse/internal/objectstore/prometheus.go
@@ -0,0 +1,39 @@
+package objectstore
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+var (
+ objectStorageUploadRequests = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_object_storage_upload_requests",
+ Help: "How many object storage requests have been processed",
+ },
+ []string{"status"},
+ )
+ objectStorageUploadsOpen = promauto.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_object_storage_upload_open",
+ Help: "Describes many object storage requests are open now",
+ },
+ )
+ objectStorageUploadBytes = promauto.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_object_storage_upload_bytes",
+ Help: "How many bytes were sent to object storage",
+ },
+ )
+ objectStorageUploadTime = promauto.NewHistogram(
+ prometheus.HistogramOpts{
+ Name: "gitlab_workhorse_object_storage_upload_time",
+ Help: "How long it took to upload objects",
+ Buckets: objectStorageUploadTimeBuckets,
+ })
+
+ objectStorageUploadRequestsRequestFailed = objectStorageUploadRequests.WithLabelValues("request-failed")
+ objectStorageUploadRequestsInvalidStatus = objectStorageUploadRequests.WithLabelValues("invalid-status")
+
+ objectStorageUploadTimeBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}
+)
diff --git a/workhorse/internal/objectstore/s3_complete_multipart_api.go b/workhorse/internal/objectstore/s3_complete_multipart_api.go
new file mode 100644
index 00000000000..b84f5757f49
--- /dev/null
+++ b/workhorse/internal/objectstore/s3_complete_multipart_api.go
@@ -0,0 +1,51 @@
+package objectstore
+
+import (
+ "encoding/xml"
+ "fmt"
+)
+
+// CompleteMultipartUpload is the S3 CompleteMultipartUpload body
+type CompleteMultipartUpload struct {
+ Part []*completeMultipartUploadPart
+}
+
+type completeMultipartUploadPart struct {
+ PartNumber int
+ ETag string
+}
+
+// CompleteMultipartUploadResult is the S3 answer to CompleteMultipartUpload request
+type CompleteMultipartUploadResult struct {
+ Location string
+ Bucket string
+ Key string
+ ETag string
+}
+
+// CompleteMultipartUploadError is the in-body error structure
+// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html#mpUploadComplete-examples
+// the answer contains other fields we are not using
+type CompleteMultipartUploadError struct {
+ XMLName xml.Name `xml:"Error"`
+ Code string
+ Message string
+}
+
+func (c *CompleteMultipartUploadError) Error() string {
+ return fmt.Sprintf("CompleteMultipartUpload remote error %q: %s", c.Code, c.Message)
+}
+
+// compoundCompleteMultipartUploadResult holds both CompleteMultipartUploadResult and CompleteMultipartUploadError
+// this allow us to deserialize the response body where the root element can either be Error orCompleteMultipartUploadResult
+type compoundCompleteMultipartUploadResult struct {
+ *CompleteMultipartUploadResult
+ *CompleteMultipartUploadError
+
+ // XMLName this overrides CompleteMultipartUploadError.XMLName tags
+ XMLName xml.Name
+}
+
+func (c *compoundCompleteMultipartUploadResult) isError() bool {
+ return c.CompleteMultipartUploadError != nil
+}
diff --git a/workhorse/internal/objectstore/s3_object.go b/workhorse/internal/objectstore/s3_object.go
new file mode 100644
index 00000000000..1f79f88224f
--- /dev/null
+++ b/workhorse/internal/objectstore/s3_object.go
@@ -0,0 +1,119 @@
+package objectstore
+
+import (
+ "context"
+ "io"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "gitlab.com/gitlab-org/labkit/log"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
+)
+
+type S3Object struct {
+ credentials config.S3Credentials
+ config config.S3Config
+ objectName string
+ uploaded bool
+
+ *uploader
+}
+
+func NewS3Object(objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config) (*S3Object, error) {
+ o := &S3Object{
+ credentials: s3Credentials,
+ config: s3Config,
+ objectName: objectName,
+ }
+
+ o.uploader = newUploader(o)
+ return o, nil
+}
+
+func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) {
+ if s3Config.ServerSideEncryption != "" {
+ input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption)
+
+ if s3Config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms && s3Config.SSEKMSKeyID != "" {
+ input.SSEKMSKeyId = aws.String(s3Config.SSEKMSKeyID)
+ }
+ }
+}
+
+func (s *S3Object) Upload(ctx context.Context, r io.Reader) error {
+ sess, err := setupS3Session(s.credentials, s.config)
+ if err != nil {
+ log.WithError(err).Error("error creating S3 session")
+ return err
+ }
+
+ uploader := s3manager.NewUploader(sess)
+
+ input := &s3manager.UploadInput{
+ Bucket: aws.String(s.config.Bucket),
+ Key: aws.String(s.objectName),
+ Body: r,
+ }
+
+ setEncryptionOptions(input, s.config)
+
+ _, err = uploader.UploadWithContext(ctx, input)
+ if err != nil {
+ log.WithError(err).Error("error uploading S3 session")
+ // Get the root cause, such as ErrEntityTooLarge, so we can return the proper HTTP status code
+ return unwrapAWSError(err)
+ }
+
+ s.uploaded = true
+
+ return nil
+}
+
+func (s *S3Object) ETag() string {
+ return ""
+}
+
+func (s *S3Object) Abort() {
+ s.Delete()
+}
+
+func (s *S3Object) Delete() {
+ if !s.uploaded {
+ return
+ }
+
+ session, err := setupS3Session(s.credentials, s.config)
+ if err != nil {
+ log.WithError(err).Error("error setting up S3 session in delete")
+ return
+ }
+
+ svc := s3.New(session)
+ input := &s3.DeleteObjectInput{
+ Bucket: aws.String(s.config.Bucket),
+ Key: aws.String(s.objectName),
+ }
+
+ // Note we can't use the request context because in a successful
+ // case, the original request has already completed.
+ deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
+ defer cancel()
+
+ _, err = svc.DeleteObjectWithContext(deleteCtx, input)
+ if err != nil {
+ log.WithError(err).Error("error deleting S3 object", err)
+ }
+}
+
+// This is needed until https://github.com/aws/aws-sdk-go/issues/2820 is closed.
+func unwrapAWSError(e error) error {
+ if awsErr, ok := e.(awserr.Error); ok {
+ return unwrapAWSError(awsErr.OrigErr())
+ }
+
+ return e
+}
diff --git a/workhorse/internal/objectstore/s3_object_test.go b/workhorse/internal/objectstore/s3_object_test.go
new file mode 100644
index 00000000000..d9ebbd7f979
--- /dev/null
+++ b/workhorse/internal/objectstore/s3_object_test.go
@@ -0,0 +1,174 @@
+package objectstore_test
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
+)
+
+type failedReader struct {
+ io.Reader
+}
+
+func (r *failedReader) Read(p []byte) (int, error) {
+ origErr := fmt.Errorf("entity is too large")
+ return 0, awserr.New("Read", "read failed", origErr)
+}
+
+func TestS3ObjectUpload(t *testing.T) {
+ testCases := []struct {
+ encryption string
+ }{
+ {encryption: ""},
+ {encryption: s3.ServerSideEncryptionAes256},
+ {encryption: s3.ServerSideEncryptionAwsKms},
+ }
+
+ for _, tc := range testCases {
+ t.Run(fmt.Sprintf("encryption=%v", tc.encryption), func(t *testing.T) {
+ creds, config, sess, ts := test.SetupS3(t, tc.encryption)
+ defer ts.Close()
+
+ deadline := time.Now().Add(testTimeout)
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ objectName := filepath.Join(tmpDir, "s3-test-data")
+ ctx, cancel := context.WithCancel(context.Background())
+
+ object, err := objectstore.NewS3Object(objectName, creds, config)
+ require.NoError(t, err)
+
+ // copy data
+ n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
+
+ test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent)
+ test.CheckS3Metadata(t, sess, config, objectName)
+
+ cancel()
+
+ testhelper.Retry(t, 5*time.Second, func() error {
+ if test.S3ObjectDoesNotExist(t, sess, config, objectName) {
+ return nil
+ }
+
+ return fmt.Errorf("file is still present")
+ })
+ })
+ }
+}
+
+func TestConcurrentS3ObjectUpload(t *testing.T) {
+ creds, uploadsConfig, uploadsSession, uploadServer := test.SetupS3WithBucket(t, "uploads", "")
+ defer uploadServer.Close()
+
+ // This will return a separate S3 endpoint
+ _, artifactsConfig, artifactsSession, artifactsServer := test.SetupS3WithBucket(t, "artifacts", "")
+ defer artifactsServer.Close()
+
+ deadline := time.Now().Add(testTimeout)
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ var wg sync.WaitGroup
+
+ for i := 0; i < 4; i++ {
+ wg.Add(1)
+
+ go func(index int) {
+ var sess *session.Session
+ var config config.S3Config
+
+ if index%2 == 0 {
+ sess = uploadsSession
+ config = uploadsConfig
+ } else {
+ sess = artifactsSession
+ config = artifactsConfig
+ }
+
+ name := fmt.Sprintf("s3-test-data-%d", index)
+ objectName := filepath.Join(tmpDir, name)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ object, err := objectstore.NewS3Object(objectName, creds, config)
+ require.NoError(t, err)
+
+ // copy data
+ n, err := object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.NoError(t, err)
+ require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
+
+ test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent)
+ wg.Done()
+ }(i)
+ }
+
+ wg.Wait()
+}
+
+func TestS3ObjectUploadCancel(t *testing.T) {
+ creds, config, _, ts := test.SetupS3(t, "")
+ defer ts.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ deadline := time.Now().Add(testTimeout)
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ objectName := filepath.Join(tmpDir, "s3-test-data")
+
+ object, err := objectstore.NewS3Object(objectName, creds, config)
+
+ require.NoError(t, err)
+
+ // Cancel the transfer before the data has been copied to ensure
+ // we handle this gracefully.
+ cancel()
+
+ _, err = object.Consume(ctx, strings.NewReader(test.ObjectContent), deadline)
+ require.Error(t, err)
+ require.Equal(t, "context canceled", err.Error())
+}
+
+func TestS3ObjectUploadLimitReached(t *testing.T) {
+ creds, config, _, ts := test.SetupS3(t, "")
+ defer ts.Close()
+
+ deadline := time.Now().Add(testTimeout)
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ objectName := filepath.Join(tmpDir, "s3-test-data")
+ object, err := objectstore.NewS3Object(objectName, creds, config)
+ require.NoError(t, err)
+
+ _, err = object.Consume(context.Background(), &failedReader{}, deadline)
+ require.Error(t, err)
+ require.Equal(t, "entity is too large", err.Error())
+}
diff --git a/workhorse/internal/objectstore/s3_session.go b/workhorse/internal/objectstore/s3_session.go
new file mode 100644
index 00000000000..ebc8daf534c
--- /dev/null
+++ b/workhorse/internal/objectstore/s3_session.go
@@ -0,0 +1,94 @@
+package objectstore
+
+import (
+ "sync"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
+)
+
+type s3Session struct {
+ session *session.Session
+ expiry time.Time
+}
+
+type s3SessionCache struct {
+ // An S3 session is cached by its input configuration (e.g. region,
+ // endpoint, path style, etc.), but the bucket is actually
+ // determined by the type of object to be uploaded (e.g. CI
+ // artifact, LFS, etc.) during runtime. In practice, we should only
+ // need one session per Workhorse process if we only allow one
+ // configuration for many different buckets. However, using a map
+ // indexed by the config avoids potential pitfalls in case the
+ // bucket configuration is supplied at startup or we need to support
+ // multiple S3 endpoints.
+ sessions map[config.S3Config]*s3Session
+ sync.Mutex
+}
+
+func (s *s3Session) isExpired() bool {
+ return time.Now().After(s.expiry)
+}
+
+func newS3SessionCache() *s3SessionCache {
+ return &s3SessionCache{sessions: make(map[config.S3Config]*s3Session)}
+}
+
+var (
+ // By default, it looks like IAM instance profiles may last 6 hours
+ // (via curl http://169.254.169.254/latest/meta-data/iam/security-credentials/<role_name>),
+ // but this may be configurable from anywhere for 15 minutes to 12
+ // hours. To be safe, refresh AWS sessions every 10 minutes.
+ sessionExpiration = time.Duration(10 * time.Minute)
+ sessionCache = newS3SessionCache()
+)
+
+// SetupS3Session initializes a new AWS S3 session and refreshes one if
+// necessary. As recommended in https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html,
+// sessions should be cached when possible. Sessions are safe to use
+// concurrently as long as the session isn't modified.
+func setupS3Session(s3Credentials config.S3Credentials, s3Config config.S3Config) (*session.Session, error) {
+ sessionCache.Lock()
+ defer sessionCache.Unlock()
+
+ if s, ok := sessionCache.sessions[s3Config]; ok && !s.isExpired() {
+ return s.session, nil
+ }
+
+ cfg := &aws.Config{
+ Region: aws.String(s3Config.Region),
+ S3ForcePathStyle: aws.Bool(s3Config.PathStyle),
+ }
+
+ // In case IAM profiles aren't being used, use the static credentials
+ if s3Credentials.AwsAccessKeyID != "" && s3Credentials.AwsSecretAccessKey != "" {
+ cfg.Credentials = credentials.NewStaticCredentials(s3Credentials.AwsAccessKeyID, s3Credentials.AwsSecretAccessKey, "")
+ }
+
+ if s3Config.Endpoint != "" {
+ cfg.Endpoint = aws.String(s3Config.Endpoint)
+ }
+
+ sess, err := session.NewSession(cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ sessionCache.sessions[s3Config] = &s3Session{
+ expiry: time.Now().Add(sessionExpiration),
+ session: sess,
+ }
+
+ return sess, nil
+}
+
+func ResetS3Session(s3Config config.S3Config) {
+ sessionCache.Lock()
+ defer sessionCache.Unlock()
+
+ delete(sessionCache.sessions, s3Config)
+}
diff --git a/workhorse/internal/objectstore/s3_session_test.go b/workhorse/internal/objectstore/s3_session_test.go
new file mode 100644
index 00000000000..8601f305917
--- /dev/null
+++ b/workhorse/internal/objectstore/s3_session_test.go
@@ -0,0 +1,57 @@
+package objectstore
+
+import (
+ "testing"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
+)
+
+func TestS3SessionSetup(t *testing.T) {
+ credentials := config.S3Credentials{}
+ cfg := config.S3Config{Region: "us-west-1", PathStyle: true}
+
+ sess, err := setupS3Session(credentials, cfg)
+ require.NoError(t, err)
+
+ require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1")
+ require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle))
+
+ require.Equal(t, len(sessionCache.sessions), 1)
+ anotherConfig := cfg
+ _, err = setupS3Session(credentials, anotherConfig)
+ require.NoError(t, err)
+ require.Equal(t, len(sessionCache.sessions), 1)
+
+ ResetS3Session(cfg)
+}
+
+func TestS3SessionExpiry(t *testing.T) {
+ credentials := config.S3Credentials{}
+ cfg := config.S3Config{Region: "us-west-1", PathStyle: true}
+
+ sess, err := setupS3Session(credentials, cfg)
+ require.NoError(t, err)
+
+ require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1")
+ require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle))
+
+ firstSession, ok := sessionCache.sessions[cfg]
+ require.True(t, ok)
+ require.False(t, firstSession.isExpired())
+
+ firstSession.expiry = time.Now().Add(-1 * time.Second)
+ require.True(t, firstSession.isExpired())
+
+ _, err = setupS3Session(credentials, cfg)
+ require.NoError(t, err)
+
+ nextSession, ok := sessionCache.sessions[cfg]
+ require.True(t, ok)
+ require.False(t, nextSession.isExpired())
+
+ ResetS3Session(cfg)
+}
diff --git a/workhorse/internal/objectstore/test/consts.go b/workhorse/internal/objectstore/test/consts.go
new file mode 100644
index 00000000000..7a1bcc28d45
--- /dev/null
+++ b/workhorse/internal/objectstore/test/consts.go
@@ -0,0 +1,19 @@
+package test
+
+// Some useful const for testing purpose
+const (
+ // ObjectContent an example textual content
+ ObjectContent = "TEST OBJECT CONTENT"
+ // ObjectSize is the ObjectContent size
+ ObjectSize = int64(len(ObjectContent))
+ // Objectpath is an example remote object path (including bucket name)
+ ObjectPath = "/bucket/object"
+ // ObjectMD5 is ObjectContent MD5 hash
+ ObjectMD5 = "42d000eea026ee0760677e506189cb33"
+ // ObjectSHA1 is ObjectContent SHA1 hash
+ ObjectSHA1 = "173cfd58c6b60cb910f68a26cbb77e3fc5017a6d"
+ // ObjectSHA256 is ObjectContent SHA256 hash
+ ObjectSHA256 = "b0257e9e657ef19b15eed4fbba975bd5238d651977564035ef91cb45693647aa"
+ // ObjectSHA512 is ObjectContent SHA512 hash
+ ObjectSHA512 = "51af8197db2047f7894652daa7437927bf831d5aa63f1b0b7277c4800b06f5e3057251f0e4c2d344ca8c2daf1ffc08a28dd3b2f5fe0e316d3fd6c3af58c34b97"
+)
diff --git a/workhorse/internal/objectstore/test/gocloud_stub.go b/workhorse/internal/objectstore/test/gocloud_stub.go
new file mode 100644
index 00000000000..cf22075e407
--- /dev/null
+++ b/workhorse/internal/objectstore/test/gocloud_stub.go
@@ -0,0 +1,47 @@
+package test
+
+import (
+ "context"
+ "io/ioutil"
+ "net/url"
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gocloud.dev/blob"
+ "gocloud.dev/blob/fileblob"
+)
+
+type dirOpener struct {
+ tmpDir string
+}
+
+func (o *dirOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) {
+ return fileblob.OpenBucket(o.tmpDir, nil)
+}
+
+func SetupGoCloudFileBucket(t *testing.T, scheme string) (m *blob.URLMux, bucketDir string, cleanup func()) {
+ tmpDir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+
+ mux := new(blob.URLMux)
+ fake := &dirOpener{tmpDir: tmpDir}
+ mux.RegisterBucket(scheme, fake)
+ cleanup = func() {
+ os.RemoveAll(tmpDir)
+ }
+
+ return mux, tmpDir, cleanup
+}
+
+func GoCloudObjectExists(t *testing.T, bucketDir string, objectName string) {
+ bucket, err := fileblob.OpenBucket(bucketDir, nil)
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(context.Background()) // lint:allow context.Background
+ defer cancel()
+
+ exists, err := bucket.Exists(ctx, objectName)
+ require.NoError(t, err)
+ require.True(t, exists)
+}
diff --git a/workhorse/internal/objectstore/test/objectstore_stub.go b/workhorse/internal/objectstore/test/objectstore_stub.go
new file mode 100644
index 00000000000..31ef4913305
--- /dev/null
+++ b/workhorse/internal/objectstore/test/objectstore_stub.go
@@ -0,0 +1,278 @@
+package test
+
+import (
+ "crypto/md5"
+ "encoding/hex"
+ "encoding/xml"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "strconv"
+ "strings"
+ "sync"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
+)
+
+type partsEtagMap map[int]string
+
+// ObjectstoreStub is a testing implementation of ObjectStore.
+// Instead of storing objects it will just save md5sum.
+type ObjectstoreStub struct {
+ // bucket contains md5sum of uploaded objects
+ bucket map[string]string
+ // overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash
+ overwriteMD5 map[string]string
+ // multipart is a map of MultipartUploads
+ multipart map[string]partsEtagMap
+ // HTTP header sent along request
+ headers map[string]*http.Header
+
+ puts int
+ deletes int
+
+ m sync.Mutex
+}
+
+// StartObjectStore will start an ObjectStore stub
+func StartObjectStore() (*ObjectstoreStub, *httptest.Server) {
+ return StartObjectStoreWithCustomMD5(make(map[string]string))
+}
+
+// StartObjectStoreWithCustomMD5 will start an ObjectStore stub: md5Hashes contains overwrites for md5sum that should be return on PutObject
+func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStub, *httptest.Server) {
+ os := &ObjectstoreStub{
+ bucket: make(map[string]string),
+ multipart: make(map[string]partsEtagMap),
+ overwriteMD5: make(map[string]string),
+ headers: make(map[string]*http.Header),
+ }
+
+ for k, v := range md5Hashes {
+ os.overwriteMD5[k] = v
+ }
+
+ return os, httptest.NewServer(os)
+}
+
+// PutsCnt counts PutObject invocations
+func (o *ObjectstoreStub) PutsCnt() int {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ return o.puts
+}
+
+// DeletesCnt counts DeleteObject invocation of a valid object
+func (o *ObjectstoreStub) DeletesCnt() int {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ return o.deletes
+}
+
+// GetObjectMD5 return the calculated MD5 of the object uploaded to path
+// it will return an empty string if no object has been uploaded on such path
+func (o *ObjectstoreStub) GetObjectMD5(path string) string {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ return o.bucket[path]
+}
+
+// GetHeader returns a given HTTP header of the object uploaded to the path
+func (o *ObjectstoreStub) GetHeader(path, key string) string {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ if val, ok := o.headers[path]; ok {
+ return val.Get(key)
+ }
+
+ return ""
+}
+
+// InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path
+// It will return an error if a MultipartUpload is already in progress on that path
+// InitiateMultipartUpload is only used during test setup.
+// Workhorse's production code does not know how to initiate a multipart upload.
+//
+// Real S3 multipart uploads are more complicated than what we do here,
+// but this is enough to verify that workhorse's production code behaves as intended.
+func (o *ObjectstoreStub) InitiateMultipartUpload(path string) error {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ if o.multipart[path] != nil {
+ return fmt.Errorf("MultipartUpload for %q already in progress", path)
+ }
+
+ o.multipart[path] = make(partsEtagMap)
+ return nil
+}
+
+// IsMultipartUpload check if the given path has a MultipartUpload in progress
+func (o *ObjectstoreStub) IsMultipartUpload(path string) bool {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ return o.isMultipartUpload(path)
+}
+
+// isMultipartUpload is the lock free version of IsMultipartUpload
+func (o *ObjectstoreStub) isMultipartUpload(path string) bool {
+ return o.multipart[path] != nil
+}
+
+func (o *ObjectstoreStub) removeObject(w http.ResponseWriter, r *http.Request) {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ objectPath := r.URL.Path
+ if o.isMultipartUpload(objectPath) {
+ o.deletes++
+ delete(o.multipart, objectPath)
+
+ w.WriteHeader(200)
+ } else if _, ok := o.bucket[objectPath]; ok {
+ o.deletes++
+ delete(o.bucket, objectPath)
+
+ w.WriteHeader(200)
+ } else {
+ w.WriteHeader(404)
+ }
+}
+
+func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ objectPath := r.URL.Path
+
+ etag, overwritten := o.overwriteMD5[objectPath]
+ if !overwritten {
+ hasher := md5.New()
+ io.Copy(hasher, r.Body)
+
+ checksum := hasher.Sum(nil)
+ etag = hex.EncodeToString(checksum)
+ }
+
+ o.headers[objectPath] = &r.Header
+ o.puts++
+ if o.isMultipartUpload(objectPath) {
+ pNumber := r.URL.Query().Get("partNumber")
+ idx, err := strconv.Atoi(pNumber)
+ if err != nil {
+ http.Error(w, fmt.Sprintf("malformed partNumber: %v", err), 400)
+ return
+ }
+
+ o.multipart[objectPath][idx] = etag
+ } else {
+ o.bucket[objectPath] = etag
+ }
+
+ w.Header().Set("ETag", etag)
+ w.WriteHeader(200)
+}
+
+func MultipartUploadInternalError() *objectstore.CompleteMultipartUploadError {
+ return &objectstore.CompleteMultipartUploadError{Code: "InternalError", Message: "malformed object path"}
+}
+
+func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http.Request) {
+ o.m.Lock()
+ defer o.m.Unlock()
+
+ objectPath := r.URL.Path
+
+ multipart := o.multipart[objectPath]
+ if multipart == nil {
+ http.Error(w, "Unknown MultipartUpload", 404)
+ return
+ }
+
+ buf, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w, err.Error(), 500)
+ return
+ }
+
+ var msg objectstore.CompleteMultipartUpload
+ err = xml.Unmarshal(buf, &msg)
+ if err != nil {
+ http.Error(w, err.Error(), 400)
+ return
+ }
+
+ for _, part := range msg.Part {
+ etag := multipart[part.PartNumber]
+ if etag != part.ETag {
+ msg := fmt.Sprintf("ETag mismatch on part %d. Expected %q got %q", part.PartNumber, etag, part.ETag)
+ http.Error(w, msg, 400)
+ return
+ }
+ }
+
+ etag, overwritten := o.overwriteMD5[objectPath]
+ if !overwritten {
+ etag = "CompleteMultipartUploadETag"
+ }
+
+ o.bucket[objectPath] = etag
+ delete(o.multipart, objectPath)
+
+ w.Header().Set("ETag", etag)
+ split := strings.SplitN(objectPath[1:], "/", 2)
+ if len(split) < 2 {
+ encodeXMLAnswer(w, MultipartUploadInternalError())
+ return
+ }
+
+ bucket := split[0]
+ key := split[1]
+ answer := objectstore.CompleteMultipartUploadResult{
+ Location: r.URL.String(),
+ Bucket: bucket,
+ Key: key,
+ ETag: etag,
+ }
+ encodeXMLAnswer(w, answer)
+}
+
+func encodeXMLAnswer(w http.ResponseWriter, answer interface{}) {
+ w.Header().Set("Content-Type", "text/xml")
+
+ enc := xml.NewEncoder(w)
+ if err := enc.Encode(answer); err != nil {
+ http.Error(w, err.Error(), 500)
+ }
+}
+
+func (o *ObjectstoreStub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Body != nil {
+ defer r.Body.Close()
+ }
+
+ fmt.Println("ObjectStore Stub:", r.Method, r.URL.String())
+
+ if r.URL.Path == "" {
+ http.Error(w, "No path provided", 404)
+ return
+ }
+
+ switch r.Method {
+ case "DELETE":
+ o.removeObject(w, r)
+ case "PUT":
+ o.putObject(w, r)
+ case "POST":
+ o.completeMultipartUpload(w, r)
+ default:
+ w.WriteHeader(404)
+ }
+}
diff --git a/workhorse/internal/objectstore/test/objectstore_stub_test.go b/workhorse/internal/objectstore/test/objectstore_stub_test.go
new file mode 100644
index 00000000000..8c0d52a2d79
--- /dev/null
+++ b/workhorse/internal/objectstore/test/objectstore_stub_test.go
@@ -0,0 +1,167 @@
+package test
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func doRequest(method, url string, body io.Reader) error {
+ req, err := http.NewRequest(method, url, body)
+ if err != nil {
+ return err
+ }
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+
+ return resp.Body.Close()
+}
+
+func TestObjectStoreStub(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ require.Equal(t, 0, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+
+ objectURL := ts.URL + ObjectPath
+
+ require.NoError(t, doRequest(http.MethodPut, objectURL, strings.NewReader(ObjectContent)))
+
+ require.Equal(t, 1, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+ require.Equal(t, ObjectMD5, stub.GetObjectMD5(ObjectPath))
+
+ require.NoError(t, doRequest(http.MethodDelete, objectURL, nil))
+
+ require.Equal(t, 1, stub.PutsCnt())
+ require.Equal(t, 1, stub.DeletesCnt())
+}
+
+func TestObjectStoreStubDelete404(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ objectURL := ts.URL + ObjectPath
+
+ req, err := http.NewRequest(http.MethodDelete, objectURL, nil)
+ require.NoError(t, err)
+
+ resp, err := http.DefaultClient.Do(req)
+ require.NoError(t, err)
+ defer resp.Body.Close()
+ require.Equal(t, 404, resp.StatusCode)
+
+ require.Equal(t, 0, stub.DeletesCnt())
+}
+
+func TestObjectStoreInitiateMultipartUpload(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ path := "/my-multipart"
+ err := stub.InitiateMultipartUpload(path)
+ require.NoError(t, err)
+
+ err = stub.InitiateMultipartUpload(path)
+ require.Error(t, err, "second attempt to open the same MultipartUpload")
+}
+
+func TestObjectStoreCompleteMultipartUpload(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ objectURL := ts.URL + ObjectPath
+ parts := []struct {
+ number int
+ content string
+ contentMD5 string
+ }{
+ {
+ number: 1,
+ content: "first part",
+ contentMD5: "550cf6b6e60f65a0e3104a26e70fea42",
+ }, {
+ number: 2,
+ content: "second part",
+ contentMD5: "920b914bca0a70780b40881b8f376135",
+ },
+ }
+
+ stub.InitiateMultipartUpload(ObjectPath)
+
+ require.True(t, stub.IsMultipartUpload(ObjectPath))
+ require.Equal(t, 0, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+
+ // Workhorse knows nothing about S3 MultipartUpload, it receives some URLs
+ // from GitLab-rails and PUTs chunk of data to each of them.
+ // Then it completes the upload with a final POST
+ partPutURLs := []string{
+ fmt.Sprintf("%s?partNumber=%d", objectURL, 1),
+ fmt.Sprintf("%s?partNumber=%d", objectURL, 2),
+ }
+ completePostURL := objectURL
+
+ for i, partPutURL := range partPutURLs {
+ part := parts[i]
+
+ require.NoError(t, doRequest(http.MethodPut, partPutURL, strings.NewReader(part.content)))
+
+ require.Equal(t, i+1, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+ require.Equal(t, part.contentMD5, stub.multipart[ObjectPath][part.number], "Part %d was not uploaded into ObjectStorage", part.number)
+ require.Empty(t, stub.GetObjectMD5(ObjectPath), "Part %d was mistakenly uploaded as a single object", part.number)
+ require.True(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted")
+ }
+
+ completeBody := fmt.Sprintf(`<CompleteMultipartUpload>
+ <Part>
+ <PartNumber>1</PartNumber>
+ <ETag>%s</ETag>
+ </Part>
+ <Part>
+ <PartNumber>2</PartNumber>
+ <ETag>%s</ETag>
+ </Part>
+ </CompleteMultipartUpload>`, parts[0].contentMD5, parts[1].contentMD5)
+ require.NoError(t, doRequest(http.MethodPost, completePostURL, strings.NewReader(completeBody)))
+
+ require.Equal(t, len(parts), stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+ require.False(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload is still in progress")
+}
+
+func TestObjectStoreAbortMultipartUpload(t *testing.T) {
+ stub, ts := StartObjectStore()
+ defer ts.Close()
+
+ stub.InitiateMultipartUpload(ObjectPath)
+
+ require.True(t, stub.IsMultipartUpload(ObjectPath))
+ require.Equal(t, 0, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+
+ objectURL := ts.URL + ObjectPath
+ require.NoError(t, doRequest(http.MethodPut, fmt.Sprintf("%s?partNumber=%d", objectURL, 1), strings.NewReader(ObjectContent)))
+
+ require.Equal(t, 1, stub.PutsCnt())
+ require.Equal(t, 0, stub.DeletesCnt())
+ require.Equal(t, ObjectMD5, stub.multipart[ObjectPath][1], "Part was not uploaded into ObjectStorage")
+ require.Empty(t, stub.GetObjectMD5(ObjectPath), "Part was mistakenly uploaded as a single object")
+ require.True(t, stub.IsMultipartUpload(ObjectPath), "MultipartUpload completed or aborted")
+
+ require.NoError(t, doRequest(http.MethodDelete, objectURL, nil))
+
+ require.Equal(t, 1, stub.PutsCnt())
+ require.Equal(t, 1, stub.DeletesCnt())
+ require.Empty(t, stub.GetObjectMD5(ObjectPath), "MultiUpload has been completed")
+ require.False(t, stub.IsMultipartUpload(ObjectPath), "MultiUpload is still in progress")
+}
diff --git a/workhorse/internal/objectstore/test/s3_stub.go b/workhorse/internal/objectstore/test/s3_stub.go
new file mode 100644
index 00000000000..36514b3b887
--- /dev/null
+++ b/workhorse/internal/objectstore/test/s3_stub.go
@@ -0,0 +1,142 @@
+package test
+
+import (
+ "io/ioutil"
+ "net/http/httptest"
+ "os"
+ "strings"
+ "testing"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
+
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+
+ "github.com/johannesboyne/gofakes3"
+ "github.com/johannesboyne/gofakes3/backend/s3mem"
+)
+
+func SetupS3(t *testing.T, encryption string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) {
+ return SetupS3WithBucket(t, "test-bucket", encryption)
+}
+
+func SetupS3WithBucket(t *testing.T, bucket string, encryption string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) {
+ backend := s3mem.New()
+ faker := gofakes3.New(backend)
+ ts := httptest.NewServer(faker.Server())
+
+ creds := config.S3Credentials{
+ AwsAccessKeyID: "YOUR-ACCESSKEYID",
+ AwsSecretAccessKey: "YOUR-SECRETACCESSKEY",
+ }
+
+ config := config.S3Config{
+ Bucket: bucket,
+ Endpoint: ts.URL,
+ Region: "eu-central-1",
+ PathStyle: true,
+ }
+
+ if encryption != "" {
+ config.ServerSideEncryption = encryption
+
+ if encryption == s3.ServerSideEncryptionAwsKms {
+ config.SSEKMSKeyID = "arn:aws:1234"
+ }
+ }
+
+ sess, err := session.NewSession(&aws.Config{
+ Credentials: credentials.NewStaticCredentials(creds.AwsAccessKeyID, creds.AwsSecretAccessKey, ""),
+ Endpoint: aws.String(ts.URL),
+ Region: aws.String(config.Region),
+ DisableSSL: aws.Bool(true),
+ S3ForcePathStyle: aws.Bool(true),
+ })
+ require.NoError(t, err)
+
+ // Create S3 service client
+ svc := s3.New(sess)
+
+ _, err = svc.CreateBucket(&s3.CreateBucketInput{
+ Bucket: aws.String(bucket),
+ })
+ require.NoError(t, err)
+
+ return creds, config, sess, ts
+}
+
+// S3ObjectExists will fail the test if the file does not exist.
+func S3ObjectExists(t *testing.T, sess *session.Session, config config.S3Config, objectName string, expectedBytes string) {
+ downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) {
+ require.NoError(t, err)
+ require.Equal(t, int64(len(expectedBytes)), numBytes)
+
+ output, err := ioutil.ReadFile(tmpfile.Name())
+ require.NoError(t, err)
+
+ require.Equal(t, []byte(expectedBytes), output)
+ })
+}
+
+func CheckS3Metadata(t *testing.T, sess *session.Session, config config.S3Config, objectName string) {
+ // In a real S3 provider, s3crypto.NewDecryptionClient should probably be used
+ svc := s3.New(sess)
+ result, err := svc.GetObject(&s3.GetObjectInput{
+ Bucket: aws.String(config.Bucket),
+ Key: aws.String(objectName),
+ })
+ require.NoError(t, err)
+
+ if config.ServerSideEncryption != "" {
+ require.Equal(t, aws.String(config.ServerSideEncryption), result.ServerSideEncryption)
+
+ if config.ServerSideEncryption == s3.ServerSideEncryptionAwsKms {
+ require.Equal(t, aws.String(config.SSEKMSKeyID), result.SSEKMSKeyId)
+ } else {
+ require.Nil(t, result.SSEKMSKeyId)
+ }
+ } else {
+ require.Nil(t, result.ServerSideEncryption)
+ require.Nil(t, result.SSEKMSKeyId)
+ }
+}
+
+// S3ObjectDoesNotExist returns true if the object has been deleted,
+// false otherwise. The return signature is different from
+// S3ObjectExists because deletion may need to be retried since deferred
+// clean up callsinternal/objectstore/test/s3_stub.go may cause the actual deletion to happen after the
+// initial check.
+func S3ObjectDoesNotExist(t *testing.T, sess *session.Session, config config.S3Config, objectName string) bool {
+ deleted := false
+
+ downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) {
+ if err != nil && strings.Contains(err.Error(), "NoSuchKey") {
+ deleted = true
+ }
+ })
+
+ return deleted
+}
+
+func downloadObject(t *testing.T, sess *session.Session, config config.S3Config, objectName string, handler func(tmpfile *os.File, numBytes int64, err error)) {
+ tmpDir, err := ioutil.TempDir("", "workhorse-test-")
+ require.NoError(t, err)
+ defer os.Remove(tmpDir)
+
+ tmpfile, err := ioutil.TempFile(tmpDir, "s3-output")
+ require.NoError(t, err)
+ defer os.Remove(tmpfile.Name())
+
+ downloadSvc := s3manager.NewDownloader(sess)
+ numBytes, err := downloadSvc.Download(tmpfile, &s3.GetObjectInput{
+ Bucket: aws.String(config.Bucket),
+ Key: aws.String(objectName),
+ })
+
+ handler(tmpfile, numBytes, err)
+}
diff --git a/workhorse/internal/objectstore/upload_strategy.go b/workhorse/internal/objectstore/upload_strategy.go
new file mode 100644
index 00000000000..5707ba5f24e
--- /dev/null
+++ b/workhorse/internal/objectstore/upload_strategy.go
@@ -0,0 +1,46 @@
+package objectstore
+
+import (
+ "context"
+ "io"
+ "net/http"
+
+ "gitlab.com/gitlab-org/labkit/log"
+ "gitlab.com/gitlab-org/labkit/mask"
+)
+
+type uploadStrategy interface {
+ Upload(ctx context.Context, r io.Reader) error
+ ETag() string
+ Abort()
+ Delete()
+}
+
+func deleteURL(url string) {
+ if url == "" {
+ return
+ }
+
+ req, err := http.NewRequest("DELETE", url, nil)
+ if err != nil {
+ log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
+ return
+ }
+ // TODO: consider adding the context to the outgoing request for better instrumentation
+
+ // here we are not using u.ctx because we must perform cleanup regardless of parent context
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
+ return
+ }
+ resp.Body.Close()
+}
+
+func extractETag(rawETag string) string {
+ if rawETag != "" && rawETag[0] == '"' {
+ rawETag = rawETag[1 : len(rawETag)-1]
+ }
+
+ return rawETag
+}
diff --git a/workhorse/internal/objectstore/uploader.go b/workhorse/internal/objectstore/uploader.go
new file mode 100644
index 00000000000..aedfbe55ead
--- /dev/null
+++ b/workhorse/internal/objectstore/uploader.go
@@ -0,0 +1,115 @@
+package objectstore
+
+import (
+ "context"
+ "crypto/md5"
+ "encoding/hex"
+ "fmt"
+ "hash"
+ "io"
+ "strings"
+ "time"
+
+ "gitlab.com/gitlab-org/labkit/log"
+)
+
+// uploader consumes an io.Reader and uploads it using a pluggable uploadStrategy.
+type uploader struct {
+ strategy uploadStrategy
+
+ // In the case of S3 uploads, we have a multipart upload which
+ // instantiates uploads for the individual parts. We don't want to
+ // increment metrics for the individual parts, so that is why we have
+ // this boolean flag.
+ metrics bool
+
+ // With S3 we compare the MD5 of the data we sent with the ETag returned
+ // by the object storage server.
+ checkETag bool
+}
+
+func newUploader(strategy uploadStrategy) *uploader {
+ return &uploader{strategy: strategy, metrics: true}
+}
+
+func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader {
+ return &uploader{strategy: strategy, metrics: metrics, checkETag: true}
+}
+
+func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) }
+
+// Consume reads the reader until it reaches EOF or an error. It spawns a
+// goroutine that waits for outerCtx to be done, after which the remote
+// file is deleted. The deadline applies to the upload performed inside
+// Consume, not to outerCtx.
+func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) {
+ if u.metrics {
+ objectStorageUploadsOpen.Inc()
+ defer func(started time.Time) {
+ objectStorageUploadsOpen.Dec()
+ objectStorageUploadTime.Observe(time.Since(started).Seconds())
+ if err != nil {
+ objectStorageUploadRequestsRequestFailed.Inc()
+ }
+ }(time.Now())
+ }
+
+ defer func() {
+ // We do this mainly to abort S3 multipart uploads: it is not enough to
+ // "delete" them.
+ if err != nil {
+ u.strategy.Abort()
+ }
+ }()
+
+ go func() {
+ // Once gitlab-rails is done handling the request, we are supposed to
+ // delete the upload from its temporary location.
+ <-outerCtx.Done()
+ u.strategy.Delete()
+ }()
+
+ uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline)
+ defer cancelFn()
+
+ var hasher hash.Hash
+ if u.checkETag {
+ hasher = md5.New()
+ reader = io.TeeReader(reader, hasher)
+ }
+
+ cr := &countReader{r: reader}
+ if err := u.strategy.Upload(uploadCtx, cr); err != nil {
+ return cr.n, err
+ }
+
+ if u.checkETag {
+ if err := compareMD5(hexString(hasher), u.strategy.ETag()); err != nil {
+ log.ContextLogger(uploadCtx).WithError(err).Error("error comparing MD5 checksum")
+ return cr.n, err
+ }
+ }
+
+ objectStorageUploadBytes.Add(float64(cr.n))
+
+ return cr.n, nil
+}
+
+func compareMD5(local, remote string) error {
+ if !strings.EqualFold(local, remote) {
+ return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote)
+ }
+
+ return nil
+}
+
+type countReader struct {
+ r io.Reader
+ n int64
+}
+
+func (cr *countReader) Read(p []byte) (int, error) {
+ nRead, err := cr.r.Read(p)
+ cr.n += int64(nRead)
+ return nRead, err
+}