diff options
Diffstat (limited to 'workhorse/internal/objectstore/uploader.go')
-rw-r--r-- | workhorse/internal/objectstore/uploader.go | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/workhorse/internal/objectstore/uploader.go b/workhorse/internal/objectstore/uploader.go new file mode 100644 index 00000000000..aedfbe55ead --- /dev/null +++ b/workhorse/internal/objectstore/uploader.go @@ -0,0 +1,115 @@ +package objectstore + +import ( + "context" + "crypto/md5" + "encoding/hex" + "fmt" + "hash" + "io" + "strings" + "time" + + "gitlab.com/gitlab-org/labkit/log" +) + +// uploader consumes an io.Reader and uploads it using a pluggable uploadStrategy. +type uploader struct { + strategy uploadStrategy + + // In the case of S3 uploads, we have a multipart upload which + // instantiates uploads for the individual parts. We don't want to + // increment metrics for the individual parts, so that is why we have + // this boolean flag. + metrics bool + + // With S3 we compare the MD5 of the data we sent with the ETag returned + // by the object storage server. + checkETag bool +} + +func newUploader(strategy uploadStrategy) *uploader { + return &uploader{strategy: strategy, metrics: true} +} + +func newETagCheckUploader(strategy uploadStrategy, metrics bool) *uploader { + return &uploader{strategy: strategy, metrics: metrics, checkETag: true} +} + +func hexString(h hash.Hash) string { return hex.EncodeToString(h.Sum(nil)) } + +// Consume reads the reader until it reaches EOF or an error. It spawns a +// goroutine that waits for outerCtx to be done, after which the remote +// file is deleted. The deadline applies to the upload performed inside +// Consume, not to outerCtx. +func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline time.Time) (_ int64, err error) { + if u.metrics { + objectStorageUploadsOpen.Inc() + defer func(started time.Time) { + objectStorageUploadsOpen.Dec() + objectStorageUploadTime.Observe(time.Since(started).Seconds()) + if err != nil { + objectStorageUploadRequestsRequestFailed.Inc() + } + }(time.Now()) + } + + defer func() { + // We do this mainly to abort S3 multipart uploads: it is not enough to + // "delete" them. + if err != nil { + u.strategy.Abort() + } + }() + + go func() { + // Once gitlab-rails is done handling the request, we are supposed to + // delete the upload from its temporary location. + <-outerCtx.Done() + u.strategy.Delete() + }() + + uploadCtx, cancelFn := context.WithDeadline(outerCtx, deadline) + defer cancelFn() + + var hasher hash.Hash + if u.checkETag { + hasher = md5.New() + reader = io.TeeReader(reader, hasher) + } + + cr := &countReader{r: reader} + if err := u.strategy.Upload(uploadCtx, cr); err != nil { + return cr.n, err + } + + if u.checkETag { + if err := compareMD5(hexString(hasher), u.strategy.ETag()); err != nil { + log.ContextLogger(uploadCtx).WithError(err).Error("error comparing MD5 checksum") + return cr.n, err + } + } + + objectStorageUploadBytes.Add(float64(cr.n)) + + return cr.n, nil +} + +func compareMD5(local, remote string) error { + if !strings.EqualFold(local, remote) { + return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote) + } + + return nil +} + +type countReader struct { + r io.Reader + n int64 +} + +func (cr *countReader) Read(p []byte) (int, error) { + nRead, err := cr.r.Read(p) + cr.n += int64(nRead) + return nRead, err +} |