Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/objectstore/multipart.go')
-rw-r--r--workhorse/internal/objectstore/multipart.go188
1 files changed, 188 insertions, 0 deletions
diff --git a/workhorse/internal/objectstore/multipart.go b/workhorse/internal/objectstore/multipart.go
new file mode 100644
index 00000000000..fd1c0ed487d
--- /dev/null
+++ b/workhorse/internal/objectstore/multipart.go
@@ -0,0 +1,188 @@
+package objectstore
+
+import (
+ "bytes"
+ "context"
+ "encoding/xml"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "os"
+
+ "gitlab.com/gitlab-org/labkit/log"
+ "gitlab.com/gitlab-org/labkit/mask"
+)
+
+// ErrNotEnoughParts will be used when writing more than size * len(partURLs)
+var ErrNotEnoughParts = errors.New("not enough Parts")
+
+// Multipart represents a MultipartUpload on a S3 compatible Object Store service.
+// It can be used as io.WriteCloser for uploading an object
+type Multipart struct {
+ PartURLs []string
+ // CompleteURL is a presigned URL for CompleteMultipartUpload
+ CompleteURL string
+ // AbortURL is a presigned URL for AbortMultipartUpload
+ AbortURL string
+ // DeleteURL is a presigned URL for RemoveObject
+ DeleteURL string
+ PutHeaders map[string]string
+ partSize int64
+ etag string
+
+ *uploader
+}
+
+// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes
+// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
+// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
+func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, partSize int64) (*Multipart, error) {
+ m := &Multipart{
+ PartURLs: partURLs,
+ CompleteURL: completeURL,
+ AbortURL: abortURL,
+ DeleteURL: deleteURL,
+ PutHeaders: putHeaders,
+ partSize: partSize,
+ }
+
+ m.uploader = newUploader(m)
+ return m, nil
+}
+
+func (m *Multipart) Upload(ctx context.Context, r io.Reader) error {
+ cmu := &CompleteMultipartUpload{}
+ for i, partURL := range m.PartURLs {
+ src := io.LimitReader(r, m.partSize)
+ part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1)
+ if err != nil {
+ return err
+ }
+ if part == nil {
+ break
+ } else {
+ cmu.Part = append(cmu.Part, part)
+ }
+ }
+
+ n, err := io.Copy(ioutil.Discard, r)
+ if err != nil {
+ return fmt.Errorf("drain pipe: %v", err)
+ }
+ if n > 0 {
+ return ErrNotEnoughParts
+ }
+
+ if err := m.complete(ctx, cmu); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *Multipart) ETag() string {
+ return m.etag
+}
+func (m *Multipart) Abort() {
+ deleteURL(m.AbortURL)
+}
+
+func (m *Multipart) Delete() {
+ deleteURL(m.DeleteURL)
+}
+
+func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
+ file, err := ioutil.TempFile("", "part-buffer")
+ if err != nil {
+ return nil, fmt.Errorf("create temporary buffer file: %v", err)
+ }
+ defer func(path string) {
+ if err := os.Remove(path); err != nil {
+ log.WithError(err).WithField("file", path).Warning("Unable to delete temporary file")
+ }
+ }(file.Name())
+
+ n, err := io.Copy(file, src)
+ if err != nil {
+ return nil, err
+ }
+ if n == 0 {
+ return nil, nil
+ }
+
+ if _, err = file.Seek(0, io.SeekStart); err != nil {
+ return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err)
+ }
+
+ etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n)
+ if err != nil {
+ return nil, fmt.Errorf("upload part %d: %v", partNumber, err)
+ }
+ return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
+}
+
+func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) {
+ deadline, ok := ctx.Deadline()
+ if !ok {
+ return "", fmt.Errorf("missing deadline")
+ }
+
+ part, err := newObject(url, "", headers, size, false)
+ if err != nil {
+ return "", err
+ }
+
+ if n, err := part.Consume(ctx, io.LimitReader(body, size), deadline); err != nil || n < size {
+ if err == nil {
+ err = io.ErrUnexpectedEOF
+ }
+ return "", err
+ }
+
+ return part.ETag(), nil
+}
+
+func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error {
+ body, err := xml.Marshal(cmu)
+ if err != nil {
+ return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err)
+ }
+
+ req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body))
+ if err != nil {
+ return fmt.Errorf("create CompleteMultipartUpload request: %v", err)
+ }
+ req.ContentLength = int64(len(body))
+ req.Header.Set("Content-Type", "application/xml")
+ req = req.WithContext(ctx)
+
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status)
+ }
+
+ result := &compoundCompleteMultipartUploadResult{}
+ decoder := xml.NewDecoder(resp.Body)
+ if err := decoder.Decode(&result); err != nil {
+ return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err)
+ }
+
+ if result.isError() {
+ return result
+ }
+
+ if result.CompleteMultipartUploadResult == nil {
+ return fmt.Errorf("empty CompleteMultipartUploadResult")
+ }
+
+ m.etag = extractETag(result.ETag)
+
+ return nil
+}