Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/objectstore/uploader.go')
-rw-r--r--workhorse/internal/objectstore/uploader.go115
1 files changed, 115 insertions, 0 deletions
diff --git a/workhorse/internal/objectstore/uploader.go b/workhorse/internal/objectstore/uploader.go
new file mode 100644
index 00000000000..aedfbe55ead
--- /dev/null
+++ b/workhorse/internal/objectstore/uploader.go
@@ -0,0 +1,115 @@
+package objectstore
+
+import (
+ "context"
+ "crypto/md5"
+ "encoding/hex"
+ "fmt"
+ "hash"
+ "io"
+ "strings"
+ "time"
+
+ "gitlab.com/gitlab-org/labkit/log"
+)
+
+// uploader consumes an io.Reader and uploads it using a pluggable uploadStrategy.
+type uploader struct {
+ strategy uploadStrategy
+
+ // In the case of S3 uploads, we have a multipart upload which
+ // instantiates uploads for the individual parts. We don't want to
+ // increment metrics for the individual parts, so that is why we have
+ // this boolean flag.
+ metrics bool
+
+ // With S3 we compare the MD5 of the data we sent with the ETag returned
+ // by the object storage server.
+ checkETag bool
+}
+
+func newUploader(strategy uploadStrategy) *uploader {
+ return &uploader{strategy: strategy, metrics: true}
+}
+
+func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader {
+ return &uploader{strategy: strategy, metrics: metrics, checkETag: true}
+}
+
+func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) }
+
+// Consume reads the reader until it reaches EOF or an error. It spawns a
+// goroutine that waits for outerCtx to be done, after which the remote
+// file is deleted. The deadline applies to the upload performed inside
+// Consume, not to outerCtx.
+func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) {
+ if u.metrics {
+ objectStorageUploadsOpen.Inc()
+ defer func(started time.Time) {
+ objectStorageUploadsOpen.Dec()
+ objectStorageUploadTime.Observe(time.Since(started).Seconds())
+ if err != nil {
+ objectStorageUploadRequestsRequestFailed.Inc()
+ }
+ }(time.Now())
+ }
+
+ defer func() {
+ // We do this mainly to abort S3 multipart uploads: it is not enough to
+ // "delete" them.
+ if err != nil {
+ u.strategy.Abort()
+ }
+ }()
+
+ go func() {
+ // Once gitlab-rails is done handling the request, we are supposed to
+ // delete the upload from its temporary location.
+ <-outerCtx.Done()
+ u.strategy.Delete()
+ }()
+
+ uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline)
+ defer cancelFn()
+
+ var hasher hash.Hash
+ if u.checkETag {
+ hasher = md5.New()
+ reader = io.TeeReader(reader, hasher)
+ }
+
+ cr := &countReader{r: reader}
+ if err := u.strategy.Upload(uploadCtx, cr); err != nil {
+ return cr.n, err
+ }
+
+ if u.checkETag {
+ if err := compareMD5(hexString(hasher), u.strategy.ETag()); err != nil {
+ log.ContextLogger(uploadCtx).WithError(err).Error("error comparing MD5 checksum")
+ return cr.n, err
+ }
+ }
+
+ objectStorageUploadBytes.Add(float64(cr.n))
+
+ return cr.n, nil
+}
+
+func compareMD5(local, remote string) error {
+ if !strings.EqualFold(local, remote) {
+ return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote)
+ }
+
+ return nil
+}
+
+type countReader struct {
+ r io.Reader
+ n int64
+}
+
+func (cr *countReader) Read(p []byte) (int, error) {
+ nRead, err := cr.r.Read(p)
+ cr.n += int64(nRead)
+ return nRead, err
+}