Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-pages.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKamil Trzciński <ayufan@ayufan.eu>2020-08-20 18:31:30 +0300
committerKamil Trzciński <ayufan@ayufan.eu>2020-08-21 11:32:06 +0300
commitf59aec9d526f05bde08f5c90cc4a1c4c1d8766eb (patch)
treeb7a1d8562e54a62261390f14d6f6bd99fea44863
parent71a39de011a7b2d19470c32ed8f73afc372636bf (diff)
Refactor ZIP support to be more OOMzip-using-vfs
-rw-r--r--internal/serving/disk/serving.go2
-rw-r--r--internal/vfs/zip/archive.go77
-rw-r--r--internal/vfs/zip/archive_test.go2
-rw-r--r--internal/vfs/zip/http_range/http_read_at.go54
-rw-r--r--internal/vfs/zip/http_range/http_reader.go129
-rw-r--r--internal/vfs/zip/http_range/resource.go70
-rw-r--r--internal/vfs/zip/http_read_at.go58
-rw-r--r--internal/vfs/zip/http_reader.go81
-rw-r--r--internal/vfs/zip/http_reader_test.go47
-rw-r--r--internal/vfs/zip/http_zip.go74
10 files changed, 294 insertions, 300 deletions
diff --git a/internal/serving/disk/serving.go b/internal/serving/disk/serving.go
index a27b1bc4..535e03cb 100644
--- a/internal/serving/disk/serving.go
+++ b/internal/serving/disk/serving.go
@@ -15,7 +15,7 @@ var disk = &Disk{
reader: Reader{
fileSizeMetric: metrics.DiskServingFileSize,
vfs: map[string]vfs.VFS{
- "": localVFS,
+ "": localVFS, // default to use if not specified
"local": localVFS,
"zip": vfs.Instrumented(zip.New(), "zip"),
},
diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go
index 3a53c417..627c59f2 100644
--- a/internal/vfs/zip/archive.go
+++ b/internal/vfs/zip/archive.go
@@ -12,29 +12,27 @@ import (
"sync"
"gitlab.com/gitlab-org/gitlab-pages/internal/vfs"
+ "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/zip/http_range"
)
const dirPrefix = "public/"
const maxSymlinkSize = 256
type zipArchive struct {
- path string
- once sync.Once
- done chan struct{}
- zip *zip.Reader
- zipCloser io.Closer
- files map[string]*zip.File
- zipErr error
+ path string
+ once sync.Once
+ done chan struct{}
+
+ resource *http_range.Resource
+ reader *http_range.ReadAtReader
+ archive *zip.Reader
+ err error
+
+ files map[string]*zip.File
}
func (a *zipArchive) openArchive(ctx context.Context) error {
- a.once.Do(func() {
- a.zip, a.zipCloser, a.zipErr = openZIPArchive(a.path)
- if a.zip != nil {
- a.processZip()
- }
- close(a.done)
- })
+ a.once.Do(a.readArchive)
// wait for it to close
// or exit early
@@ -42,31 +40,41 @@ func (a *zipArchive) openArchive(ctx context.Context) error {
case <-a.done:
case <-ctx.Done():
}
- return a.zipErr
+ return a.err
}
-func (a *zipArchive) processZip() {
- for _, file := range a.zip.File {
- if !strings.HasPrefix(file.Name, dirPrefix) {
- continue
+func (a *zipArchive) readArchive() {
+ a.resource, a.err = http_range.NewResource(context.Background(), a.path)
+ if a.err != nil {
+ return
+ }
+
+ a.reader = http_range.NewReadAt(a.resource)
+ a.reader.WithCachedReader(func() {
+ a.archive, a.err = zip.NewReader(a.reader, a.resource.Size)
+ })
+
+ if a.archive != nil {
+ for _, file := range a.archive.File {
+ if !strings.HasPrefix(file.Name, dirPrefix) {
+ continue
+ }
+ a.files[file.Name] = file
}
- a.files[file.Name] = file
+
+ // recycle memory
+ a.archive.File = nil
}
- // recycle memory
- a.zip.File = nil
+ close(a.done)
}
func (a *zipArchive) close() {
- if a.zipCloser != nil {
- a.zipCloser.Close()
- }
- a.zipCloser = nil
- a.zip = nil
+ // no-op: everything can be GC recycled
}
func (a *zipArchive) findFile(name string) *zip.File {
- name = filepath.Join("public", name)
+ name = filepath.Join(dirPrefix, name)
if file := a.files[name]; file != nil {
return file
@@ -128,17 +136,8 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) {
}
// TODO: We can support `io.Seeker` if file would not be compressed
-
- if !isHTTPArchive(a.path) {
- return file.Open()
- }
-
- var reader io.ReadCloser
- reader = &httpReader{
- URL: a.path,
- Off: dataOffset,
- N: int64(file.UncompressedSize64),
- }
+ var reader vfs.File
+ reader = a.reader.SectionReader(dataOffset, int64(file.CompressedSize64))
switch file.Method {
case zip.Deflate:
diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go
index d550b635..fe7b0d50 100644
--- a/internal/vfs/zip/archive_test.go
+++ b/internal/vfs/zip/archive_test.go
@@ -9,6 +9,8 @@ import (
"github.com/stretchr/testify/require"
)
+const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200820%2F%2Fs3%2Faws4_request&X-Amz-Date=20200820T152420Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=fcf49604f53564ce1648e5a0c2d8f1186ba3d9dd5e40d2c3244c57053e0348e9"
+
func TestOpenArchive(t *testing.T) {
zip := newArchive(URL)
defer zip.close()
diff --git a/internal/vfs/zip/http_range/http_read_at.go b/internal/vfs/zip/http_range/http_read_at.go
new file mode 100644
index 00000000..85d6205e
--- /dev/null
+++ b/internal/vfs/zip/http_range/http_read_at.go
@@ -0,0 +1,54 @@
+package http_range
+
+import (
+ "io"
+)
+
+type ReadAtReader struct {
+ R *Resource
+ cachedReader *Reader
+}
+
+func (h *ReadAtReader) cachedRead(p []byte, off int64) (n int, err error) {
+ if !h.cachedReader.WithinRange(off, int64(len(p))) {
+ h.cachedReader.Close()
+ h.cachedReader = NewReader(h.R, off, h.R.Size-off)
+ }
+
+ return io.ReadFull(h.cachedReader, p)
+}
+
+func (h *ReadAtReader) ephemeralRead(p []byte, off int64) (n int, err error) {
+ reader := NewReader(h.R, off, int64(len(p)))
+ defer reader.Close()
+
+ return io.ReadFull(reader, p)
+}
+
+func (h *ReadAtReader) SectionReader(off, n int64) *Reader {
+ return NewReader(h.R, off, n)
+}
+
+func (h *ReadAtReader) ReadAt(p []byte, off int64) (n int, err error) {
+ if h.cachedReader != nil {
+ return h.cachedRead(p, off)
+ }
+
+ return h.ephemeralRead(p, off)
+}
+
+func (h *ReadAtReader) WithCachedReader(fn func()) {
+ h.cachedReader = NewReader(h.R, 0, h.R.Size)
+
+ defer func() {
+ h.cachedReader.Close()
+ h.cachedReader = nil
+ }()
+
+ fn()
+}
+
+// NewReadAt creates a ReadAt object on a given resource
+func NewReadAt(resource *Resource) *ReadAtReader {
+ return &ReadAtReader{R: resource}
+}
diff --git a/internal/vfs/zip/http_range/http_reader.go b/internal/vfs/zip/http_range/http_reader.go
new file mode 100644
index 00000000..e9db1255
--- /dev/null
+++ b/internal/vfs/zip/http_range/http_reader.go
@@ -0,0 +1,129 @@
+package http_range
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+
+ "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport"
+ "gitlab.com/gitlab-org/gitlab-pages/metrics"
+)
+
+var (
+ // ErrRangeRequestsNotSupported is returned by Seek and Read
+ // when the remote server does not allow range requests (Accept-Ranges was not set)
+ ErrRangeRequestsNotSupported = errors.New("range requests are not supported by the remote server")
+ // ErrInvalidRange is returned by Read when trying to read past the end of the file
+ ErrInvalidRange = errors.New("invalid range")
+ // ErrContentHasChanged is returned by Read when the content has changed since the first request
+ ErrContentHasChanged = errors.New("content has changed since first request")
+)
+
+type Reader struct {
+ R *Resource
+ offset, size int64
+ res *http.Response
+}
+
+var httpClient = &http.Client{
+ // TODO: we need connect timeout
+ // The longest time the request can be executed
+ Timeout: 30 * time.Minute,
+ Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal),
+}
+
+func (h *Reader) ensureRequest() (err error) {
+ if h.res != nil {
+ return nil
+ }
+
+ if h.offset < 0 || h.size < 0 || h.offset+h.size > h.R.Size {
+ return ErrInvalidRange
+ }
+
+ req, err := http.NewRequest("GET", h.R.URL, nil)
+ if err != nil {
+ return err
+ }
+
+ if h.R.LastModified != "" {
+ req.Header.Set("If-Range", h.R.LastModified)
+ } else if h.R.Etag != "" {
+ req.Header.Set("If-Range", h.R.Etag)
+ }
+
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.offset, h.offset+h.size-1))
+
+ res, err := httpClient.Do(req)
+ if err != nil {
+ return err
+ }
+
+ // cleanup body on failure to avoid memory leak
+ defer func() {
+ if err != nil {
+ res.Body.Close()
+ }
+ }()
+
+ switch res.StatusCode {
+ case http.StatusOK:
+ // some servers return 200 OK for bytes=0-
+ if h.offset > 0 || h.R.Etag != "" && h.R.Etag != res.Header.Get("ETag") {
+ return ErrContentHasChanged
+ }
+ break
+
+ case http.StatusPartialContent:
+ break
+
+ case http.StatusRequestedRangeNotSatisfiable:
+ return ErrRangeRequestsNotSupported
+
+ default:
+ return fmt.Errorf("failed with %d: %q", res.StatusCode, res.Status)
+ }
+
+ h.res = res
+ return nil
+}
+
+// WithinRange checks if a given data can be read efficiently
+func (h *Reader) WithinRange(offset, n int64) bool {
+ return h.offset == offset && n <= h.size
+}
+
+// Read reads a data into a given buffer
+func (h *Reader) Read(p []byte) (int, error) {
+ if len(p) == 0 {
+ return 0, nil
+ }
+
+ if err := h.ensureRequest(); err != nil {
+ return 0, err
+ }
+
+ n, err := h.res.Body.Read(p)
+
+ if err == nil || err == io.EOF {
+ h.offset += int64(n)
+ h.size -= int64(n)
+ }
+ return n, err
+}
+
+// Close closes a requests body
+func (h *Reader) Close() error {
+ if h.res != nil {
+ // TODO: should we read till end?
+ return h.res.Body.Close()
+ }
+ return nil
+}
+
+// NewReader creates a Reader object on a given resource for a given range
+func NewReader(resource *Resource, offset, n int64) *Reader {
+ return &Reader{R: resource, offset: offset, size: n}
+}
diff --git a/internal/vfs/zip/http_range/resource.go b/internal/vfs/zip/http_range/resource.go
new file mode 100644
index 00000000..d35563b0
--- /dev/null
+++ b/internal/vfs/zip/http_range/resource.go
@@ -0,0 +1,70 @@
+package http_range
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strconv"
+ "strings"
+)
+
+type Resource struct {
+ URL string
+ Etag string
+ LastModified string
+ Size int64
+}
+
+func NewResource(ctx context.Context, URL string) (*Resource, error) {
+ // the `h.URL` is likely presigned only for GET
+ req, err := http.NewRequest("GET", URL, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ req = req.WithContext(ctx)
+
+ // we fetch a single byte and ensure that range requests is additionally supported
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", 0, 0))
+ res, err := httpClient.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer io.Copy(ioutil.Discard, res.Body)
+ defer res.Body.Close()
+
+ resource := &Resource{
+ URL: URL,
+ Etag: res.Header.Get("ETag"),
+ LastModified: res.Header.Get("Last-Modified"),
+ }
+
+ switch res.StatusCode {
+ case http.StatusOK:
+ resource.Size = res.ContentLength
+ println(resource.URL, resource.Etag, resource.LastModified, resource.Size)
+ return resource, nil
+
+ case http.StatusPartialContent:
+ contentRange := res.Header.Get("Content-Range")
+ ranges := strings.SplitN(contentRange, "/", 2)
+ if len(ranges) != 2 {
+ return nil, fmt.Errorf("invalid `Content-Range`: %q", contentRange)
+ }
+
+ resource.Size, err = strconv.ParseInt(ranges[1], 0, 64)
+ if err != nil {
+ return nil, err
+ }
+
+ return resource, nil
+
+ case http.StatusRequestedRangeNotSatisfiable:
+ return nil, ErrRangeRequestsNotSupported
+
+ default:
+ return nil, fmt.Errorf("failed with %d: %q", res.StatusCode, res.Status)
+ }
+}
diff --git a/internal/vfs/zip/http_read_at.go b/internal/vfs/zip/http_read_at.go
deleted file mode 100644
index fa2194fe..00000000
--- a/internal/vfs/zip/http_read_at.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package zip
-
-import (
- "errors"
- "io"
-)
-
-type httpReadAt struct {
- URL string
- Size int64
- cached bool
- cachedReader *httpReader
-}
-
-func (h *httpReadAt) cachedRead(p []byte, off int64) (n int, err error) {
- if off < 0 || off > h.Size {
- return 0, errors.New("outside of bounds")
- }
-
- if h.cachedReader != nil && (h.cachedReader.Off != off || h.cachedReader.N < int64(len(p))) {
- h.cachedReader.Close()
- h.cachedReader = nil
- }
-
- if h.cachedReader == nil {
- h.cachedReader = &httpReader{URL: h.URL, Off: off, N: h.Size - off}
- }
-
- return io.ReadFull(h.cachedReader, p)
-}
-
-func (h *httpReadAt) ephemeralRead(p []byte, off int64) (n int, err error) {
- r := httpReader{URL: h.URL, Off: off, N: int64(len(p))}
- defer r.Close()
-
- return io.ReadFull(&r, p)
-}
-
-func (h *httpReadAt) ReadAt(p []byte, off int64) (n int, err error) {
- if h.cached {
- return h.cachedRead(p, off)
- }
-
- return h.ephemeralRead(p, off)
-}
-
-func (h *httpReadAt) withCachedReader(fn func()) {
- h.cached = true
-
- defer func() {
- if h.cachedReader != nil {
- h.cachedReader.Close()
- }
- h.cached = false
- }()
-
- fn()
-}
diff --git a/internal/vfs/zip/http_reader.go b/internal/vfs/zip/http_reader.go
deleted file mode 100644
index d7824f83..00000000
--- a/internal/vfs/zip/http_reader.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package zip
-
-import (
- "fmt"
- "io"
- "net/http"
- "time"
-
- "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport"
- "gitlab.com/gitlab-org/gitlab-pages/metrics"
-)
-
-type httpReader struct {
- URL string
- Off int64
- N int64
- res *http.Response
-}
-
-var httpClient = &http.Client{
- // TODO: we need connect timeout
- // The longest time the request can be executed
- Timeout: 30 * time.Minute,
- Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal),
-}
-
-var requests int
-
-func (h *httpReader) ensureRequest(requestedSize int) error {
- if h.res != nil {
- return nil
- }
-
- req, err := http.NewRequest("GET", h.URL, nil)
- if err != nil {
- return err
- }
- requests++
-
- req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.Off, h.Off+h.N-1))
- res, err := httpClient.Do(req)
- if err != nil {
- return err
- }
-
- println("HTTP Request", "Range", "off=", h.Off, "n=", h.N, "requestedSize=", requestedSize, "statusCode=", res.StatusCode, "requests=", requests)
- if res.StatusCode != http.StatusPartialContent {
- res.Body.Close()
- // TODO: sanitize URL
- return fmt.Errorf("the %q failed with %d: %q", h.URL, res.StatusCode, res.Status)
- }
-
- h.res = res
- return nil
-}
-
-func (h *httpReader) Read(p []byte) (int, error) {
- if len(p) == 0 {
- return 0, nil
- }
-
- if err := h.ensureRequest(len(p)); err != nil {
- return 0, err
- }
-
- n, err := h.res.Body.Read(p)
-
- if err == nil || err == io.EOF {
- h.Off += int64(n)
- h.N -= int64(n)
- }
- return n, err
-}
-
-func (h *httpReader) Close() error {
- if h.res != nil {
- // TODO: should we read till end?
- return h.res.Body.Close()
- }
- return nil
-}
diff --git a/internal/vfs/zip/http_reader_test.go b/internal/vfs/zip/http_reader_test.go
deleted file mode 100644
index 32fdc1dc..00000000
--- a/internal/vfs/zip/http_reader_test.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package zip
-
-import (
- "archive/zip"
- "io/ioutil"
- "testing"
-
- "github.com/stretchr/testify/require"
-)
-
-//const URL = "https://storage.googleapis.com/gitlab-gprd-artifacts/5b/f5/5bf596ed115cd2bf53b9ac39f77563ee26275e449f62c0b96a0fdb5d719ac6da/2020_08_18/692126387/760066867/artifacts.zip?response-content-disposition=attachment%3B%20filename%3D%22artifacts.zip%22%3B%20filename%2A%3DUTF-8%27%27artifacts.zip&response-content-type=application%2Fzip&GoogleAccessId=gitlab-object-storage-prd@gitlab-production.iam.gserviceaccount.com&Signature=2yWByo4dT4Ic6fkrm2asb0TinnFHnELcciZ6qB6Nhc8eNyTYxaZe9aOQblwR%0AWz9yiI84zaD%2F0eZtiJI06OqGz3u%2Bchsc7Mn%2BEdthhmcR9lIUJrbQh96BUEJf%0A0GniiYOGhEdr2gK9sr%2FYPiX7jv4ABkMmyr%2BZdxCPd1%2F%2FnIGFVyTdX07CbsrI%0AYA67RGOez1w9RqcF0wAy5qKs57E9fXshc%2BWqRZD%2Fbtd4PysOHGAT47i7Vslt%0AuzcaGXyiN7Hl8Ckq3WimeacCoB9L%2FNntsLcwx3llKdE0gpzAH04vjiVi705p%0AKV16FuRsF4qbhjYcKjzUao33QVGGXpdOsikF7JrnmA%3D%3D&Expires=1597782702"
-const URL = "http://192.168.88.233:9000/test-bucket/doc-gitlab-com.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST_KEY%2F20200818%2F%2Fs3%2Faws4_request&X-Amz-Date=20200818T173935Z&X-Amz-Expires=432000&X-Amz-SignedHeaders=host&X-Amz-Signature=95810918d1b2441a07385838ebba5a0f01fdf4dcdf94ea9c602f8e7d06c84019"
-
-func findZipFile(zip *zip.Reader, name string) *zip.File {
- for _, file := range zip.File {
- if file.Name == name {
- return file
- }
- }
- return nil
-}
-
-func TestOpenArchiveHTTP(t *testing.T) {
- zip, closer, err := openZIPHTTPArchive(URL)
- require.NoError(t, err)
- defer closer.Close()
-
- require.NotNil(t, zip)
- require.NotEmpty(t, zip.File)
-
- sitemap := findZipFile(zip, "public/sitemap.xml")
- require.NotNil(t, sitemap)
-
- println("DataOffset")
- _, err = sitemap.DataOffset()
- require.NoError(t, err)
-
- println("Open")
- rc, err := sitemap.Open()
- require.NoError(t, err)
- defer rc.Close()
-
- println("ReadAll")
- data, err := ioutil.ReadAll(rc)
- require.NoError(t, err)
- require.NotNil(t, data)
-}
diff --git a/internal/vfs/zip/http_zip.go b/internal/vfs/zip/http_zip.go
deleted file mode 100644
index d17a0e7b..00000000
--- a/internal/vfs/zip/http_zip.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package zip
-
-import (
- "archive/zip"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "strconv"
- "strings"
-)
-
-func isHTTPArchive(path string) bool {
- return strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://")
-}
-
-func httpSize(path string) (int64, error) {
- // the `h.URL` is likely presigned only for GET
- req, err := http.NewRequest("GET", path, nil)
- if err != nil {
- return 0, err
- }
-
- req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", 0, 0))
- res, err := httpClient.Do(req)
- if err != nil {
- return 0, err
- }
- defer io.Copy(ioutil.Discard, res.Body)
- defer res.Body.Close()
-
- if res.StatusCode != http.StatusPartialContent {
- // TODO: sanitize URL
- return 0, fmt.Errorf("the %q failed with %d: %q", path, res.StatusCode, res.Status)
- }
-
- contentRange := res.Header.Get("Content-Range")
- ranges := strings.SplitN(contentRange, "/", 2)
- if len(ranges) != 2 {
- return 0, fmt.Errorf("the %q has invalid `Content-Range`: %q", path, contentRange)
- }
-
- return strconv.ParseInt(ranges[1], 0, 64)
-}
-
-func openZIPHTTPArchive(url string) (zipReader *zip.Reader, closer io.Closer, err error) {
- size, err := httpSize(url)
- if err != nil {
- return nil, nil, err
- }
-
- httpReader := &httpReadAt{URL: url, Size: size, cached: true}
- httpReader.withCachedReader(func() {
- zipReader, err = zip.NewReader(httpReader, size)
- })
-
- return zipReader, ioutil.NopCloser(nil), err
-}
-
-func openZIPDiskArchive(path string) (*zip.Reader, io.Closer, error) {
- r, err := zip.OpenReader(path)
- if err != nil {
- return nil, nil, err
- }
- return &r.Reader, r, nil
-}
-
-func openZIPArchive(path string) (*zip.Reader, io.Closer, error) {
- if isHTTPArchive(path) {
- return openZIPHTTPArchive(path)
- }
-
- return openZIPDiskArchive(path)
-}