diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2022-03-18 23:02:30 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2022-03-18 23:02:30 +0300 |
commit | 41fe97390ceddf945f3d967b8fdb3de4c66b7dea (patch) | |
tree | 9c8d89a8624828992f06d892cd2f43818ff5dcc8 /workhorse/internal/upload/destination/objectstore/multipart.go | |
parent | 0804d2dc31052fb45a1efecedc8e06ce9bc32862 (diff) |
Add latest changes from gitlab-org/gitlab@14-9-stable-eev14.9.0-rc42
Diffstat (limited to 'workhorse/internal/upload/destination/objectstore/multipart.go')
-rw-r--r-- | workhorse/internal/upload/destination/objectstore/multipart.go | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/workhorse/internal/upload/destination/objectstore/multipart.go b/workhorse/internal/upload/destination/objectstore/multipart.go new file mode 100644 index 00000000000..4c5b64b27ee --- /dev/null +++ b/workhorse/internal/upload/destination/objectstore/multipart.go @@ -0,0 +1,187 @@ +package objectstore + +import ( + "bytes" + "context" + "encoding/xml" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + + "gitlab.com/gitlab-org/labkit/mask" +) + +// ErrNotEnoughParts will be used when writing more than size * len(partURLs) +var ErrNotEnoughParts = errors.New("not enough Parts") + +// Multipart represents a MultipartUpload on a S3 compatible Object Store service. +// It can be used as io.WriteCloser for uploading an object +type Multipart struct { + PartURLs []string + // CompleteURL is a presigned URL for CompleteMultipartUpload + CompleteURL string + // AbortURL is a presigned URL for AbortMultipartUpload + AbortURL string + // DeleteURL is a presigned URL for RemoveObject + DeleteURL string + PutHeaders map[string]string + partSize int64 + etag string + + *uploader +} + +// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes +// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. +// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources +func NewMultipart(partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, partSize int64) (*Multipart, error) { + m := &Multipart{ + PartURLs: partURLs, + CompleteURL: completeURL, + AbortURL: abortURL, + DeleteURL: deleteURL, + PutHeaders: putHeaders, + partSize: partSize, + } + + m.uploader = newUploader(m) + return m, nil +} + +func (m *Multipart) Upload(ctx context.Context, r io.Reader) error { + cmu := &CompleteMultipartUpload{} + for i, partURL := range m.PartURLs { + src := io.LimitReader(r, m.partSize) + part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1) + if err != nil { + return err + } + if part == nil { + break + } else { + cmu.Part = append(cmu.Part, part) + } + } + + n, err := io.Copy(ioutil.Discard, r) + if err != nil { + return fmt.Errorf("drain pipe: %v", err) + } + if n > 0 { + return ErrNotEnoughParts + } + + if err := m.complete(ctx, cmu); err != nil { + return err + } + + return nil +} + +func (m *Multipart) ETag() string { + return m.etag +} +func (m *Multipart) Abort() { + deleteURL(m.AbortURL) +} + +func (m *Multipart) Delete() { + deleteURL(m.DeleteURL) +} + +func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { + file, err := ioutil.TempFile("", "part-buffer") + if err != nil { + return nil, fmt.Errorf("create temporary buffer file: %v", err) + } + defer file.Close() + + if err := os.Remove(file.Name()); err != nil { + return nil, err + } + + n, err := io.Copy(file, src) + if err != nil { + return nil, err + } + if n == 0 { + return nil, nil + } + + if _, err = file.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err) + } + + etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n) + if err != nil { + return nil, fmt.Errorf("upload part %d: %v", partNumber, err) + } + return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil +} + +func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) { + deadline, ok := ctx.Deadline() + if !ok { + return "", fmt.Errorf("missing deadline") + } + + part, err := newObject(url, "", headers, size, false) + if err != nil { + return "", err + } + + if n, err := part.Consume(ctx, io.LimitReader(body, size), deadline); err != nil || n < size { + if err == nil { + err = io.ErrUnexpectedEOF + } + return "", err + } + + return part.ETag(), nil +} + +func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error { + body, err := xml.Marshal(cmu) + if err != nil { + return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) + } + + req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create CompleteMultipartUpload request: %v", err) + } + req.ContentLength = int64(len(body)) + req.Header.Set("Content-Type", "application/xml") + req = req.WithContext(ctx) + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status) + } + + result := &compoundCompleteMultipartUploadResult{} + decoder := xml.NewDecoder(resp.Body) + if err := decoder.Decode(&result); err != nil { + return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err) + } + + if result.isError() { + return result + } + + if result.CompleteMultipartUploadResult == nil { + return fmt.Errorf("empty CompleteMultipartUploadResult") + } + + m.etag = extractETag(result.ETag) + + return nil +} |