diff options
author | Kamil Trzciński <ayufan@ayufan.eu> | 2020-08-20 18:31:30 +0300 |
---|---|---|
committer | Kamil Trzciński <ayufan@ayufan.eu> | 2020-08-21 11:32:06 +0300 |
commit | f59aec9d526f05bde08f5c90cc4a1c4c1d8766eb (patch) | |
tree | b7a1d8562e54a62261390f14d6f6bd99fea44863 | |
parent | 71a39de011a7b2d19470c32ed8f73afc372636bf (diff) |
Refactor ZIP support to be more OOMzip-using-vfs
-rw-r--r-- | internal/serving/disk/serving.go | 2 | ||||
-rw-r--r-- | internal/vfs/zip/archive.go | 77 | ||||
-rw-r--r-- | internal/vfs/zip/archive_test.go | 2 | ||||
-rw-r--r-- | internal/vfs/zip/http_range/http_read_at.go | 54 | ||||
-rw-r--r-- | internal/vfs/zip/http_range/http_reader.go | 129 | ||||
-rw-r--r-- | internal/vfs/zip/http_range/resource.go | 70 | ||||
-rw-r--r-- | internal/vfs/zip/http_read_at.go | 58 | ||||
-rw-r--r-- | internal/vfs/zip/http_reader.go | 81 | ||||
-rw-r--r-- | internal/vfs/zip/http_reader_test.go | 47 | ||||
-rw-r--r-- | internal/vfs/zip/http_zip.go | 74 |
10 files changed, 294 insertions, 300 deletions
diff --git a/internal/serving/disk/serving.go b/internal/serving/disk/serving.go index a27b1bc4..535e03cb 100644 --- a/internal/serving/disk/serving.go +++ b/internal/serving/disk/serving.go @@ -15,7 +15,7 @@ var disk = &Disk{ reader: Reader{ fileSizeMetric: metrics.DiskServingFileSize, vfs: map[string]vfs.VFS{ - "": localVFS, + "": localVFS, // default to use if not specified "local": localVFS, "zip": vfs.Instrumented(zip.New(), "zip"), }, diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index 3a53c417..627c59f2 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -12,29 +12,27 @@ import ( "sync" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/zip/http_range" ) const dirPrefix = "public/" const maxSymlinkSize = 256 type zipArchive struct { - path string - once sync.Once - done chan struct{} - zip *zip.Reader - zipCloser io.Closer - files map[string]*zip.File - zipErr error + path string + once sync.Once + done chan struct{} + + resource *http_range.Resource + reader *http_range.ReadAtReader + archive *zip.Reader + err error + + files map[string]*zip.File } func (a *zipArchive) openArchive(ctx context.Context) error { - a.once.Do(func() { - a.zip, a.zipCloser, a.zipErr = openZIPArchive(a.path) - if a.zip != nil { - a.processZip() - } - close(a.done) - }) + a.once.Do(a.readArchive) // wait for it to close // or exit early @@ -42,31 +40,41 @@ func (a *zipArchive) openArchive(ctx context.Context) error { case <-a.done: case <-ctx.Done(): } - return a.zipErr + return a.err } -func (a *zipArchive) processZip() { - for _, file := range a.zip.File { - if !strings.HasPrefix(file.Name, dirPrefix) { - continue +func (a *zipArchive) readArchive() { + a.resource, a.err = http_range.NewResource(context.Background(), a.path) + if a.err != nil { + return + } + + a.reader = http_range.NewReadAt(a.resource) + a.reader.WithCachedReader(func() { + a.archive, a.err = zip.NewReader(a.reader, a.resource.Size) + }) + + if a.archive != nil { + for _, file := range a.archive.File { + if !strings.HasPrefix(file.Name, dirPrefix) { + continue + } + a.files[file.Name] = file } - a.files[file.Name] = file + + // recycle memory + a.archive.File = nil } - // recycle memory - a.zip.File = nil + close(a.done) } func (a *zipArchive) close() { - if a.zipCloser != nil { - a.zipCloser.Close() - } - a.zipCloser = nil - a.zip = nil + // no-op: everything can be GC recycled } func (a *zipArchive) findFile(name string) *zip.File { - name = filepath.Join("public", name) + name = filepath.Join(dirPrefix, name) if file := a.files[name]; file != nil { return file @@ -128,17 +136,8 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { } // TODO: We can support `io.Seeker` if file would not be compressed - - if !isHTTPArchive(a.path) { - return file.Open() - } - - var reader io.ReadCloser - reader = &httpReader{ - URL: a.path, - Off: dataOffset, - N: int64(file.UncompressedSize64), - } + var reader vfs.File + reader = a.reader.SectionReader(dataOffset, int64(file.CompressedSize64)) switch file.Method { case zip.Deflate: diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go index d550b635..fe7b0d50 100644 --- a/internal/vfs/zip/archive_test.go +++ b/internal/vfs/zip/archive_test.go @@ -9,6 +9,8 @@ import ( "github.com/stretchr/testify/require" ) +const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200820%2F%2Fs3%2Faws4_request&X-Amz-Date=20200820T152420Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=fcf49604f53564ce1648e5a0c2d8f1186ba3d9dd5e40d2c3244c57053e0348e9" + func TestOpenArchive(t *testing.T) { zip := newArchive(URL) defer zip.close() diff --git a/internal/vfs/zip/http_range/http_read_at.go b/internal/vfs/zip/http_range/http_read_at.go new file mode 100644 index 00000000..85d6205e --- /dev/null +++ b/internal/vfs/zip/http_range/http_read_at.go @@ -0,0 +1,54 @@ +package http_range + +import ( + "io" +) + +type ReadAtReader struct { + R *Resource + cachedReader *Reader +} + +func (h *ReadAtReader) cachedRead(p []byte, off int64) (n int, err error) { + if !h.cachedReader.WithinRange(off, int64(len(p))) { + h.cachedReader.Close() + h.cachedReader = NewReader(h.R, off, h.R.Size-off) + } + + return io.ReadFull(h.cachedReader, p) +} + +func (h *ReadAtReader) ephemeralRead(p []byte, off int64) (n int, err error) { + reader := NewReader(h.R, off, int64(len(p))) + defer reader.Close() + + return io.ReadFull(reader, p) +} + +func (h *ReadAtReader) SectionReader(off, n int64) *Reader { + return NewReader(h.R, off, n) +} + +func (h *ReadAtReader) ReadAt(p []byte, off int64) (n int, err error) { + if h.cachedReader != nil { + return h.cachedRead(p, off) + } + + return h.ephemeralRead(p, off) +} + +func (h *ReadAtReader) WithCachedReader(fn func()) { + h.cachedReader = NewReader(h.R, 0, h.R.Size) + + defer func() { + h.cachedReader.Close() + h.cachedReader = nil + }() + + fn() +} + +// NewReadAt creates a ReadAt object on a given resource +func NewReadAt(resource *Resource) *ReadAtReader { + return &ReadAtReader{R: resource} +} diff --git a/internal/vfs/zip/http_range/http_reader.go b/internal/vfs/zip/http_range/http_reader.go new file mode 100644 index 00000000..e9db1255 --- /dev/null +++ b/internal/vfs/zip/http_range/http_reader.go @@ -0,0 +1,129 @@ +package http_range + +import ( + "errors" + "fmt" + "io" + "net/http" + "time" + + "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" + "gitlab.com/gitlab-org/gitlab-pages/metrics" +) + +var ( + // ErrRangeRequestsNotSupported is returned by Seek and Read + // when the remote server does not allow range requests (Accept-Ranges was not set) + ErrRangeRequestsNotSupported = errors.New("range requests are not supported by the remote server") + // ErrInvalidRange is returned by Read when trying to read past the end of the file + ErrInvalidRange = errors.New("invalid range") + // ErrContentHasChanged is returned by Read when the content has changed since the first request + ErrContentHasChanged = errors.New("content has changed since first request") +) + +type Reader struct { + R *Resource + offset, size int64 + res *http.Response +} + +var httpClient = &http.Client{ + // TODO: we need connect timeout + // The longest time the request can be executed + Timeout: 30 * time.Minute, + Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal), +} + +func (h *Reader) ensureRequest() (err error) { + if h.res != nil { + return nil + } + + if h.offset < 0 || h.size < 0 || h.offset+h.size > h.R.Size { + return ErrInvalidRange + } + + req, err := http.NewRequest("GET", h.R.URL, nil) + if err != nil { + return err + } + + if h.R.LastModified != "" { + req.Header.Set("If-Range", h.R.LastModified) + } else if h.R.Etag != "" { + req.Header.Set("If-Range", h.R.Etag) + } + + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.offset, h.offset+h.size-1)) + + res, err := httpClient.Do(req) + if err != nil { + return err + } + + // cleanup body on failure to avoid memory leak + defer func() { + if err != nil { + res.Body.Close() + } + }() + + switch res.StatusCode { + case http.StatusOK: + // some servers return 200 OK for bytes=0- + if h.offset > 0 || h.R.Etag != "" && h.R.Etag != res.Header.Get("ETag") { + return ErrContentHasChanged + } + break + + case http.StatusPartialContent: + break + + case http.StatusRequestedRangeNotSatisfiable: + return ErrRangeRequestsNotSupported + + default: + return fmt.Errorf("failed with %d: %q", res.StatusCode, res.Status) + } + + h.res = res + return nil +} + +// WithinRange checks if a given data can be read efficiently +func (h *Reader) WithinRange(offset, n int64) bool { + return h.offset == offset && n <= h.size +} + +// Read reads a data into a given buffer +func (h *Reader) Read(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + + if err := h.ensureRequest(); err != nil { + return 0, err + } + + n, err := h.res.Body.Read(p) + + if err == nil || err == io.EOF { + h.offset += int64(n) + h.size -= int64(n) + } + return n, err +} + +// Close closes a requests body +func (h *Reader) Close() error { + if h.res != nil { + // TODO: should we read till end? + return h.res.Body.Close() + } + return nil +} + +// NewReader creates a Reader object on a given resource for a given range +func NewReader(resource *Resource, offset, n int64) *Reader { + return &Reader{R: resource, offset: offset, size: n} +} diff --git a/internal/vfs/zip/http_range/resource.go b/internal/vfs/zip/http_range/resource.go new file mode 100644 index 00000000..d35563b0 --- /dev/null +++ b/internal/vfs/zip/http_range/resource.go @@ -0,0 +1,70 @@ +package http_range + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + "strings" +) + +type Resource struct { + URL string + Etag string + LastModified string + Size int64 +} + +func NewResource(ctx context.Context, URL string) (*Resource, error) { + // the `h.URL` is likely presigned only for GET + req, err := http.NewRequest("GET", URL, nil) + if err != nil { + return nil, err + } + + req = req.WithContext(ctx) + + // we fetch a single byte and ensure that range requests is additionally supported + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", 0, 0)) + res, err := httpClient.Do(req) + if err != nil { + return nil, err + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + + resource := &Resource{ + URL: URL, + Etag: res.Header.Get("ETag"), + LastModified: res.Header.Get("Last-Modified"), + } + + switch res.StatusCode { + case http.StatusOK: + resource.Size = res.ContentLength + println(resource.URL, resource.Etag, resource.LastModified, resource.Size) + return resource, nil + + case http.StatusPartialContent: + contentRange := res.Header.Get("Content-Range") + ranges := strings.SplitN(contentRange, "/", 2) + if len(ranges) != 2 { + return nil, fmt.Errorf("invalid `Content-Range`: %q", contentRange) + } + + resource.Size, err = strconv.ParseInt(ranges[1], 0, 64) + if err != nil { + return nil, err + } + + return resource, nil + + case http.StatusRequestedRangeNotSatisfiable: + return nil, ErrRangeRequestsNotSupported + + default: + return nil, fmt.Errorf("failed with %d: %q", res.StatusCode, res.Status) + } +} diff --git a/internal/vfs/zip/http_read_at.go b/internal/vfs/zip/http_read_at.go deleted file mode 100644 index fa2194fe..00000000 --- a/internal/vfs/zip/http_read_at.go +++ /dev/null @@ -1,58 +0,0 @@ -package zip - -import ( - "errors" - "io" -) - -type httpReadAt struct { - URL string - Size int64 - cached bool - cachedReader *httpReader -} - -func (h *httpReadAt) cachedRead(p []byte, off int64) (n int, err error) { - if off < 0 || off > h.Size { - return 0, errors.New("outside of bounds") - } - - if h.cachedReader != nil && (h.cachedReader.Off != off || h.cachedReader.N < int64(len(p))) { - h.cachedReader.Close() - h.cachedReader = nil - } - - if h.cachedReader == nil { - h.cachedReader = &httpReader{URL: h.URL, Off: off, N: h.Size - off} - } - - return io.ReadFull(h.cachedReader, p) -} - -func (h *httpReadAt) ephemeralRead(p []byte, off int64) (n int, err error) { - r := httpReader{URL: h.URL, Off: off, N: int64(len(p))} - defer r.Close() - - return io.ReadFull(&r, p) -} - -func (h *httpReadAt) ReadAt(p []byte, off int64) (n int, err error) { - if h.cached { - return h.cachedRead(p, off) - } - - return h.ephemeralRead(p, off) -} - -func (h *httpReadAt) withCachedReader(fn func()) { - h.cached = true - - defer func() { - if h.cachedReader != nil { - h.cachedReader.Close() - } - h.cached = false - }() - - fn() -} diff --git a/internal/vfs/zip/http_reader.go b/internal/vfs/zip/http_reader.go deleted file mode 100644 index d7824f83..00000000 --- a/internal/vfs/zip/http_reader.go +++ /dev/null @@ -1,81 +0,0 @@ -package zip - -import ( - "fmt" - "io" - "net/http" - "time" - - "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" - "gitlab.com/gitlab-org/gitlab-pages/metrics" -) - -type httpReader struct { - URL string - Off int64 - N int64 - res *http.Response -} - -var httpClient = &http.Client{ - // TODO: we need connect timeout - // The longest time the request can be executed - Timeout: 30 * time.Minute, - Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal), -} - -var requests int - -func (h *httpReader) ensureRequest(requestedSize int) error { - if h.res != nil { - return nil - } - - req, err := http.NewRequest("GET", h.URL, nil) - if err != nil { - return err - } - requests++ - - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.Off, h.Off+h.N-1)) - res, err := httpClient.Do(req) - if err != nil { - return err - } - - println("HTTP Request", "Range", "off=", h.Off, "n=", h.N, "requestedSize=", requestedSize, "statusCode=", res.StatusCode, "requests=", requests) - if res.StatusCode != http.StatusPartialContent { - res.Body.Close() - // TODO: sanitize URL - return fmt.Errorf("the %q failed with %d: %q", h.URL, res.StatusCode, res.Status) - } - - h.res = res - return nil -} - -func (h *httpReader) Read(p []byte) (int, error) { - if len(p) == 0 { - return 0, nil - } - - if err := h.ensureRequest(len(p)); err != nil { - return 0, err - } - - n, err := h.res.Body.Read(p) - - if err == nil || err == io.EOF { - h.Off += int64(n) - h.N -= int64(n) - } - return n, err -} - -func (h *httpReader) Close() error { - if h.res != nil { - // TODO: should we read till end? - return h.res.Body.Close() - } - return nil -} diff --git a/internal/vfs/zip/http_reader_test.go b/internal/vfs/zip/http_reader_test.go deleted file mode 100644 index 32fdc1dc..00000000 --- a/internal/vfs/zip/http_reader_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package zip - -import ( - "archive/zip" - "io/ioutil" - "testing" - - "github.com/stretchr/testify/require" -) - -//const URL = "https://storage.googleapis.com/gitlab-gprd-artifacts/5b/f5/5bf596ed115cd2bf53b9ac39f77563ee26275e449f62c0b96a0fdb5d719ac6da/2020_08_18/692126387/760066867/artifacts.zip?response-content-disposition=attachment%3B%20filename%3D%22artifacts.zip%22%3B%20filename%2A%3DUTF-8%27%27artifacts.zip&response-content-type=application%2Fzip&GoogleAccessId=gitlab-object-storage-prd@gitlab-production.iam.gserviceaccount.com&Signature=2yWByo4dT4Ic6fkrm2asb0TinnFHnELcciZ6qB6Nhc8eNyTYxaZe9aOQblwR%0AWz9yiI84zaD%2F0eZtiJI06OqGz3u%2Bchsc7Mn%2BEdthhmcR9lIUJrbQh96BUEJf%0A0GniiYOGhEdr2gK9sr%2FYPiX7jv4ABkMmyr%2BZdxCPd1%2F%2FnIGFVyTdX07CbsrI%0AYA67RGOez1w9RqcF0wAy5qKs57E9fXshc%2BWqRZD%2Fbtd4PysOHGAT47i7Vslt%0AuzcaGXyiN7Hl8Ckq3WimeacCoB9L%2FNntsLcwx3llKdE0gpzAH04vjiVi705p%0AKV16FuRsF4qbhjYcKjzUao33QVGGXpdOsikF7JrnmA%3D%3D&Expires=1597782702" -const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200818%2F%2Fs3%2Faws4_request&X-Amz-Date=20200818T173935Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=95810918d1b2441a07385838ebba5a0f01fdf4dcdf94ea9c602f8e7d06c84019" - -func findZipFile(zip *zip.Reader, name string) *zip.File { - for _, file := range zip.File { - if file.Name == name { - return file - } - } - return nil -} - -func TestOpenArchiveHTTP(t *testing.T) { - zip, closer, err := openZIPHTTPArchive(URL) - require.NoError(t, err) - defer closer.Close() - - require.NotNil(t, zip) - require.NotEmpty(t, zip.File) - - sitemap := findZipFile(zip, "public/sitemap.xml") - require.NotNil(t, sitemap) - - println("DataOffset") - _, err = sitemap.DataOffset() - require.NoError(t, err) - - println("Open") - rc, err := sitemap.Open() - require.NoError(t, err) - defer rc.Close() - - println("ReadAll") - data, err := ioutil.ReadAll(rc) - require.NoError(t, err) - require.NotNil(t, data) -} diff --git a/internal/vfs/zip/http_zip.go b/internal/vfs/zip/http_zip.go deleted file mode 100644 index d17a0e7b..00000000 --- a/internal/vfs/zip/http_zip.go +++ /dev/null @@ -1,74 +0,0 @@ -package zip - -import ( - "archive/zip" - "fmt" - "io" - "io/ioutil" - "net/http" - "strconv" - "strings" -) - -func isHTTPArchive(path string) bool { - return strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") -} - -func httpSize(path string) (int64, error) { - // the `h.URL` is likely presigned only for GET - req, err := http.NewRequest("GET", path, nil) - if err != nil { - return 0, err - } - - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", 0, 0)) - res, err := httpClient.Do(req) - if err != nil { - return 0, err - } - defer io.Copy(ioutil.Discard, res.Body) - defer res.Body.Close() - - if res.StatusCode != http.StatusPartialContent { - // TODO: sanitize URL - return 0, fmt.Errorf("the %q failed with %d: %q", path, res.StatusCode, res.Status) - } - - contentRange := res.Header.Get("Content-Range") - ranges := strings.SplitN(contentRange, "/", 2) - if len(ranges) != 2 { - return 0, fmt.Errorf("the %q has invalid `Content-Range`: %q", path, contentRange) - } - - return strconv.ParseInt(ranges[1], 0, 64) -} - -func openZIPHTTPArchive(url string) (zipReader *zip.Reader, closer io.Closer, err error) { - size, err := httpSize(url) - if err != nil { - return nil, nil, err - } - - httpReader := &httpReadAt{URL: url, Size: size, cached: true} - httpReader.withCachedReader(func() { - zipReader, err = zip.NewReader(httpReader, size) - }) - - return zipReader, ioutil.NopCloser(nil), err -} - -func openZIPDiskArchive(path string) (*zip.Reader, io.Closer, error) { - r, err := zip.OpenReader(path) - if err != nil { - return nil, nil, err - } - return &r.Reader, r, nil -} - -func openZIPArchive(path string) (*zip.Reader, io.Closer, error) { - if isHTTPArchive(path) { - return openZIPHTTPArchive(path) - } - - return openZIPDiskArchive(path) -} |