diff options
author | Kamil Trzciński <ayufan@ayufan.eu> | 2020-08-17 17:38:54 +0300 |
---|---|---|
committer | Kamil Trzciński <ayufan@ayufan.eu> | 2020-08-17 17:54:43 +0300 |
commit | 61873541514a10ada0e7b215dcd40a94e013ef2d (patch) | |
tree | 552377268cbdf044f09a508b2d1160fc8d37fcb7 | |
parent | 3957883e3d210105594fb1872147a47345d3d462 (diff) |
Support remote ZIPzip-using-remote-vfs
-rw-r--r-- | internal/serving/file/zip/serving.go | 6 | ||||
-rw-r--r-- | internal/vfs/zip/archive.go | 60 | ||||
-rw-r--r-- | internal/vfs/zip/deflate_reader.go | 27 | ||||
-rw-r--r-- | internal/vfs/zip/http_reader.go | 140 | ||||
-rw-r--r-- | internal/vfs/zip/vfs.go | 26 | ||||
-rw-r--r-- | metrics/metrics.go | 12 |
6 files changed, 246 insertions, 25 deletions
diff --git a/internal/serving/file/zip/serving.go b/internal/serving/file/zip/serving.go index 3acaea07..d6ce7073 100644 --- a/internal/serving/file/zip/serving.go +++ b/internal/serving/file/zip/serving.go @@ -4,13 +4,13 @@ import ( "gitlab.com/gitlab-org/gitlab-pages/internal/serving" "gitlab.com/gitlab-org/gitlab-pages/internal/serving/file" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" - "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/local" + "gitlab.com/gitlab-org/gitlab-pages/internal/vfs/zip" ) -var zip = file.New(vfs.Instrumented(local.VFS{}, "zip")) +var zipServing = file.New(vfs.Instrumented(zip.New(), "zip")) // New returns a serving instance that is capable of reading files // from the disk func New() serving.Serving { - return zip + return zipServing } diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go index aeaf515a..4f7ef699 100644 --- a/internal/vfs/zip/archive.go +++ b/internal/vfs/zip/archive.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os" "strings" + "fmt" "sync" "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" @@ -16,17 +17,18 @@ const dirPrefix = "public/" const maxSymlinkSize = 256 type zipArchive struct { - path string - once sync.Once - done chan struct{} - zip *zip.ReadCloser - files map[string]*zip.File - zipErr error + path string + once sync.Once + done chan struct{} + zip *zip.Reader + zipCloser io.Closer + files map[string]*zip.File + zipErr error } -func (a *zipArchive) Open(ctx context.Context) error { +func (a *zipArchive) openArchive(ctx context.Context) error { a.once.Do(func() { - a.zip, a.zipErr = zip.OpenReader(a.path) + a.zip, a.zipCloser, a.zipErr = openZIPArchive(a.path) if a.zip != nil { a.processZip() } @@ -55,11 +57,12 @@ func (a *zipArchive) processZip() { a.zip.File = nil } -func (a *zipArchive) Close() { - if a.zip != nil { - a.zip.Close() - a.zip = nil +func (a *zipArchive) close() { + if a.zipCloser != nil { + a.zipCloser.Close() } + a.zipCloser = nil + a.zip = nil } func (a *zipArchive) Lstat(ctx context.Context, name string) (os.FileInfo, error) { @@ -100,12 +103,39 @@ func (a *zipArchive) Open(ctx context.Context, name string) (vfs.File, error) { return nil, os.ErrNotExist } - rc, err := file.Open() + dataOffset, err := file.DataOffset() + if err != nil { + return nil, err + } + // TODO: We can support `io.Seeker` if file would not be compressed - return rc, err + + if !isHTTPArchive(a.path) { + return file.Open() + } + + var reader io.ReadCloser + reader = &httpReader{ + URL: a.path, + Off: dataOffset, + N: int64(file.UncompressedSize64), + } + + switch file.Method { + case zip.Deflate: + reader = newDeflateReader(reader) + + case zip.Store: + // no-op + + default: + return nil, fmt.Errorf("unsupported compression: %x", file.Method) + } + + return reader, nil } -func newArchive(path string) zipArchive { +func newArchive(path string) *zipArchive { return &zipArchive{ path: path, done: make(chan struct{}), diff --git a/internal/vfs/zip/deflate_reader.go b/internal/vfs/zip/deflate_reader.go new file mode 100644 index 00000000..2e55ee5a --- /dev/null +++ b/internal/vfs/zip/deflate_reader.go @@ -0,0 +1,27 @@ +package zip + +import ( + "compress/flate" + "io" +) + +type deflateReader struct { + R io.ReadCloser + D io.ReadCloser +} + +func (r *deflateReader) Read(p []byte) (n int, err error) { + return r.D.Read(p) +} + +func (r *deflateReader) Close() error { + r.R.Close() + return r.D.Close() +} + +func newDeflateReader(r io.ReadCloser) *deflateReader { + return &deflateReader{ + R: r, + D: flate.NewReader(r), + } +} diff --git a/internal/vfs/zip/http_reader.go b/internal/vfs/zip/http_reader.go new file mode 100644 index 00000000..99c94b11 --- /dev/null +++ b/internal/vfs/zip/http_reader.go @@ -0,0 +1,140 @@ +package zip + +import ( + "archive/zip" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + "time" + + "gitlab.com/gitlab-org/gitlab-pages/internal/httptransport" + "gitlab.com/gitlab-org/gitlab-pages/metrics" +) + +type httpReader struct { + URL string + Off int64 + N int64 + res *http.Response +} + +var httpClient = &http.Client{ + // TODO: we need connect timeout + // The longest time the request can be executed + Timeout: 30 * time.Minute, + Transport: httptransport.NewTransportWithMetrics(metrics.ZIPHttpReaderReqDuration, metrics.ZIPHttpReaderReqTotal), +} + +func (h *httpReader) ensureRequest() error { + if h.res != nil { + return nil + } + + req, err := http.NewRequest("GET", h.URL, nil) + if err != nil { + return err + } + + req.Header.Set("Range", fmt.Sprintf("%d-%d", h.Off, h.Off+h.N-1)) + res, err := httpClient.Do(req) + if err != nil { + return err + } + if res.StatusCode != http.StatusOK { + res.Body.Close() + // TODO: sanitize URL + return fmt.Errorf("the %q failed with %d: %q", h.URL, res.StatusCode, res.Status) + } + + return nil +} + +func (h *httpReader) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + + if err := h.ensureRequest(); err != nil { + return 0, err + } + + return h.res.Body.Read(p) +} + +func (h *httpReader) Close() error { + if h.res != nil { + // TODO: should we read till end? + return h.res.Body.Close() + } + return nil +} + +type httpReadAt struct { + URL string +} + +func (h *httpReadAt) ReadAt(p []byte, off int64) (n int, err error) { + r := httpReader{URL: h.URL, Off: off, N: int64(len(p))} + defer r.Close() + + // TODO: + // Even if ReadAt returns n < len(p), it may use all of p as scratch space during the call. + // If some data is available but not len(p) bytes, ReadAt blocks until either all the data + // is available or an error occurs. In this respect ReadAt is different from Read. + return r.Read(p) +} + +func isHTTPArchive(path string) bool { + return strings.HasPrefix(path, "https://") +} + +func httpSize(path string) (int64, error) { + // the `h.URL` is likely presigned only for GET + req, err := http.NewRequest("GET", path, nil) + if err != nil { + return 0, err + } + + req.Header.Set("Range", fmt.Sprintf("%d-%d", 0, 0)) + res, err := httpClient.Do(req) + if err != nil { + return 0, err + } + defer io.Copy(ioutil.Discard, res.Body) + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + // TODO: sanitize URL + return 0, fmt.Errorf("the %q failed with %d: %q", path, res.StatusCode, res.Status) + } + + return res.ContentLength, nil +} + +func openZIPHTTPArchive(url string) (*zip.Reader, io.Closer, error) { + size, err := httpSize(url) + if err != nil { + return nil, nil, err + } + + r, err := zip.NewReader(&httpReadAt{URL: url}, size) + return r, nil, err +} + +func openZIPDiskArchive(path string) (*zip.Reader, io.Closer, error) { + r, err := zip.OpenReader(path) + if err != nil { + return nil, nil, err + } + return &r.Reader, r, nil +} + +func openZIPArchive(path string) (*zip.Reader, io.Closer, error) { + if isHTTPArchive(path) { + return openZIPHTTPArchive(path) + } + + return openZIPDiskArchive(path) +} diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go index b5ba720f..acdb8de3 100644 --- a/internal/vfs/zip/vfs.go +++ b/internal/vfs/zip/vfs.go @@ -8,6 +8,10 @@ import ( "gitlab.com/gitlab-org/gitlab-pages/internal/vfs" ) +const cacheExpirationInterval = time.Minute +const cacheRefreshInterval = time.Minute / 2 +const cacheEvictInterval = time.Minute + type zipVFS struct { cache *cache.Cache } @@ -15,8 +19,13 @@ type zipVFS struct { func (fs *zipVFS) Dir(ctx context.Context, path string) (vfs.Dir, error) { // we do it in loop to not use any additional locks for { - dir, found := fs.cache.Get(path) - if !found { + dir, till, found := fs.cache.GetWithExpiration(path) + if found { + if till.Sub(time.Now()) < cacheRefreshInterval { + // refresh item + fs.cache.Set(path, dir, cache.DefaultExpiration) + } + } else { dir = newArchive(path) // if it errors, it means that it is already added @@ -26,18 +35,21 @@ func (fs *zipVFS) Dir(ctx context.Context, path string) (vfs.Dir, error) { } } - err := dir.(*zipArchive).Open(ctx) - return dir, err + zipDir := dir.(*zipArchive) + + err := zipDir.openArchive(ctx) + return zipDir, err } } func New() vfs.VFS { vfs := &zipVFS{ - cache: cache.New(time.Minute, 2*time.Minute), + cache: cache.New(cacheExpirationInterval, cacheRefreshInterval), } + vfs.cache.OnEvicted(func(path string, object interface{}) { - if archive, ok := object.(*Archive); archive != nil && ok { - archive.Close() + if archive, ok := object.(*zipArchive); archive != nil && ok { + archive.close() } }) return vfs diff --git a/metrics/metrics.go b/metrics/metrics.go index 0792a41f..f6f82014 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -77,6 +77,18 @@ var ( Help: "The time (in seconds) it takes to get a response from the GitLab domains API", }, []string{"status_code"}) + // DomainsSourceAPIReqTotal is the number of calls made to the Object Storage that returned a 4XX error + ZIPHttpReaderReqTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "gitlab_pages_zip_reader_requests_total", + Help: "The number of Object Storage API calls with different status codes", + }, []string{"status_code"}) + + // DomainsSourceAPICallDuration is the time it takes to get a response from the Object Storage in seconds + ZIPHttpReaderReqDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "gitlab_pages_zip_reader_requests_duration", + Help: "The time (in seconds) it takes to get a response from the Object Storage", + }, []string{"status_code"}) + // DiskServingFileSize metric for file size serving. serving_types: disk and object_storage DiskServingFileSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "gitlab_pages_disk_serving_file_size_bytes", |