Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-pages.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKamil TrzciƄski <ayufan@ayufan.eu>2020-11-10 14:20:38 +0300
committerVladimir Shushlin <vshushlin@gitlab.com>2020-11-10 14:20:38 +0300
commit204e25d4d59f0923369811309afa1f37c148b965 (patch)
treef51992d88f9b5c47ddd00c6edac17246df9f6714
parent4fbcf6cd329906f04b850c48413cca623e2e189d (diff)
Allow to refresh an existing cached archives when accessed
If archive is broken (which should in fact never happen) we fail the first request and mark the cache entry as invalid. It will be refreshed on a next try.
-rw-r--r--internal/httprange/http_reader.go23
-rw-r--r--internal/httprange/http_reader_test.go57
-rw-r--r--internal/httprange/resource.go53
-rw-r--r--internal/httprange/resource_test.go31
-rw-r--r--internal/vfs/zip/archive.go42
-rw-r--r--internal/vfs/zip/archive_test.go137
-rw-r--r--internal/vfs/zip/vfs.go63
-rw-r--r--internal/vfs/zip/vfs_test.go8
8 files changed, 308 insertions, 106 deletions
diff --git a/internal/httprange/http_reader.go b/internal/httprange/http_reader.go
index 467256a0..589351fa 100644
--- a/internal/httprange/http_reader.go
+++ b/internal/httprange/http_reader.go
@@ -18,15 +18,12 @@ var (
ErrNotFound = errors.New("resource not found")
// ErrRangeRequestsNotSupported is returned by Seek and Read
- // when the remote server does not allow range requests (Accept-Ranges was not set)
- ErrRangeRequestsNotSupported = errors.New("range requests are not supported by the remote server")
+ // when the remote server does not allow range requests for a given request parameters
+ ErrRangeRequestsNotSupported = errors.New("requests range is not supported by the remote server")
// ErrInvalidRange is returned by Read when trying to read past the end of the file
ErrInvalidRange = errors.New("invalid range")
- // ErrContentHasChanged is returned by Read when the content has changed since the first request
- ErrContentHasChanged = errors.New("content has changed since first request")
-
// seek errors no need to export them
errSeekInvalidWhence = errors.New("invalid whence")
errSeekOutsideRange = errors.New("outside of range")
@@ -106,21 +103,12 @@ func (r *Reader) prepareRequest() (*http.Request, error) {
return nil, ErrInvalidRange
}
- req, err := http.NewRequest("GET", r.Resource.URL, nil)
+ req, err := r.Resource.Request()
if err != nil {
return nil, err
}
req = req.WithContext(r.ctx)
-
- if r.Resource.ETag != "" {
- req.Header.Set("ETag", r.Resource.ETag)
- } else if r.Resource.LastModified != "" {
- // Last-Modified should be a fallback mechanism in case ETag is not present
- // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified
- req.Header.Set("If-Range", r.Resource.LastModified)
- }
-
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", r.offset, r.rangeStart+r.rangeSize-1))
return req, nil
@@ -133,14 +121,17 @@ func (r *Reader) setResponse(res *http.Response) error {
// some servers return 200 OK for bytes=0-
// TODO: should we handle r.Resource.Last-Modified as well?
if r.offset > 0 || r.Resource.ETag != "" && r.Resource.ETag != res.Header.Get("ETag") {
- return ErrContentHasChanged
+ r.Resource.setError(ErrRangeRequestsNotSupported)
+ return ErrRangeRequestsNotSupported
}
case http.StatusNotFound:
+ r.Resource.setError(ErrNotFound)
return ErrNotFound
case http.StatusPartialContent:
// Requested `Range` request succeeded https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/206
break
case http.StatusRequestedRangeNotSatisfiable:
+ r.Resource.setError(ErrRangeRequestsNotSupported)
return ErrRangeRequestsNotSupported
default:
return fmt.Errorf("httprange: read response %d: %q", res.StatusCode, res.Status)
diff --git a/internal/httprange/http_reader_test.go b/internal/httprange/http_reader_test.go
index 9c8f11d0..97bfbf24 100644
--- a/internal/httprange/http_reader_test.go
+++ b/internal/httprange/http_reader_test.go
@@ -199,49 +199,61 @@ func TestSeekAndRead(t *testing.T) {
func TestReaderSetResponse(t *testing.T) {
tests := map[string]struct {
- status int
- offset int64
- prevETag string
- resEtag string
- expectedErrMsg string
+ status int
+ offset int64
+ prevETag string
+ resEtag string
+ expectedErrMsg string
+ expectedIsValid bool
}{
"partial_content_success": {
- status: http.StatusPartialContent,
+ status: http.StatusPartialContent,
+ expectedIsValid: true,
},
"status_ok_success": {
- status: http.StatusOK,
+ status: http.StatusOK,
+ expectedIsValid: true,
},
"status_ok_previous_response_invalid_offset": {
- status: http.StatusOK,
- offset: 1,
- expectedErrMsg: ErrContentHasChanged.Error(),
+ status: http.StatusOK,
+ offset: 1,
+ expectedErrMsg: ErrRangeRequestsNotSupported.Error(),
+ expectedIsValid: false,
},
"status_ok_previous_response_different_etag": {
- status: http.StatusOK,
- prevETag: "old",
- resEtag: "new",
- expectedErrMsg: ErrContentHasChanged.Error(),
+ status: http.StatusOK,
+ prevETag: "old",
+ resEtag: "new",
+ expectedErrMsg: ErrRangeRequestsNotSupported.Error(),
+ expectedIsValid: false,
},
"requested_range_not_satisfiable": {
- status: http.StatusRequestedRangeNotSatisfiable,
- expectedErrMsg: ErrRangeRequestsNotSupported.Error(),
+ status: http.StatusRequestedRangeNotSatisfiable,
+ expectedErrMsg: ErrRangeRequestsNotSupported.Error(),
+ expectedIsValid: false,
},
"not_found": {
- status: http.StatusNotFound,
- expectedErrMsg: ErrNotFound.Error(),
+ status: http.StatusNotFound,
+ expectedErrMsg: ErrNotFound.Error(),
+ expectedIsValid: false,
},
"unhandled_status_code": {
- status: http.StatusInternalServerError,
- expectedErrMsg: "httprange: read response 500:",
+ status: http.StatusInternalServerError,
+ expectedErrMsg: "httprange: read response 500:",
+ expectedIsValid: true,
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
- r := NewReader(context.Background(), &Resource{ETag: tt.prevETag}, tt.offset, 0)
+ resource := &Resource{ETag: tt.prevETag}
+ reader := NewReader(context.Background(), resource, tt.offset, 0)
res := &http.Response{StatusCode: tt.status, Header: map[string][]string{}}
res.Header.Set("ETag", tt.resEtag)
- err := r.setResponse(res)
+ err := reader.setResponse(res)
+
+ require.Equal(t, tt.expectedIsValid, resource.Valid())
+
if tt.expectedErrMsg != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.expectedErrMsg)
@@ -249,7 +261,6 @@ func TestReaderSetResponse(t *testing.T) {
}
require.NoError(t, err)
- require.Equal(t, r.res, res)
})
}
}
diff --git a/internal/httprange/resource.go b/internal/httprange/resource.go
index d2dbd340..8b908fe8 100644
--- a/internal/httprange/resource.go
+++ b/internal/httprange/resource.go
@@ -8,15 +8,63 @@ import (
"net/http"
"strconv"
"strings"
+ "sync/atomic"
)
// Resource represents any HTTP resource that can be read by a GET operation.
// It holds the resource's URL and metadata about it.
type Resource struct {
- URL string
ETag string
LastModified string
Size int64
+
+ url atomic.Value
+ err atomic.Value
+}
+
+func (r *Resource) URL() string {
+ url, _ := r.url.Load().(string)
+ return url
+}
+
+func (r *Resource) SetURL(url string) {
+ if r.URL() == url {
+ // We want to avoid cache lines invalidation
+ // on CPU due to value change
+ return
+ }
+
+ r.url.Store(url)
+}
+
+func (r *Resource) Err() error {
+ err, _ := r.err.Load().(error)
+ return err
+}
+
+func (r *Resource) Valid() bool {
+ return r.Err() == nil
+}
+
+func (r *Resource) setError(err error) {
+ r.err.Store(err)
+}
+
+func (r *Resource) Request() (*http.Request, error) {
+ req, err := http.NewRequest("GET", r.URL(), nil)
+ if err != nil {
+ return nil, err
+ }
+
+ if r.ETag != "" {
+ req.Header.Set("ETag", r.ETag)
+ } else if r.LastModified != "" {
+ // Last-Modified should be a fallback mechanism in case ETag is not present
+ // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified
+ req.Header.Set("If-Range", r.LastModified)
+ }
+
+ return req, nil
}
func NewResource(ctx context.Context, url string) (*Resource, error) {
@@ -44,11 +92,12 @@ func NewResource(ctx context.Context, url string) (*Resource, error) {
}()
resource := &Resource{
- URL: url,
ETag: res.Header.Get("ETag"),
LastModified: res.Header.Get("Last-Modified"),
}
+ resource.SetURL(url)
+
switch res.StatusCode {
case http.StatusOK:
resource.Size = res.ContentLength
diff --git a/internal/httprange/resource_test.go b/internal/httprange/resource_test.go
index ace8f92c..1d6481fc 100644
--- a/internal/httprange/resource_test.go
+++ b/internal/httprange/resource_test.go
@@ -4,14 +4,21 @@ import (
"context"
"net/http"
"net/http/httptest"
+ "sync/atomic"
"testing"
"github.com/stretchr/testify/require"
)
+func urlValue(url string) atomic.Value {
+ v := atomic.Value{}
+ v.Store(url)
+ return v
+}
+
func TestNewResource(t *testing.T) {
- resource := Resource{
- URL: "/some/resource",
+ resource := &Resource{
+ url: urlValue("/some/resource"),
ETag: "etag",
LastModified: "Wed, 21 Oct 2015 07:28:00 GMT",
Size: 1,
@@ -21,7 +28,7 @@ func TestNewResource(t *testing.T) {
url string
status int
contentRange string
- want Resource
+ want *Resource
expectedErrMsg string
}{
"status_ok": {
@@ -33,37 +40,43 @@ func TestNewResource(t *testing.T) {
url: "/some/resource",
status: http.StatusPartialContent,
contentRange: "bytes 200-1000/67589",
- want: func() Resource {
- r := resource
- r.Size = 67589
- return r
- }(),
+ want: &Resource{
+ url: urlValue("/some/resource"),
+ ETag: "etag",
+ LastModified: "Wed, 21 Oct 2015 07:28:00 GMT",
+ Size: 67589,
+ },
},
"status_partial_content_invalid_content_range": {
url: "/some/resource",
status: http.StatusPartialContent,
contentRange: "invalid",
expectedErrMsg: "invalid `Content-Range`:",
+ want: resource,
},
"status_partial_content_content_range_not_a_number": {
url: "/some/resource",
status: http.StatusPartialContent,
contentRange: "bytes 200-1000/notanumber",
expectedErrMsg: "invalid `Content-Range`:",
+ want: resource,
},
"StatusRequestedRangeNotSatisfiable": {
url: "/some/resource",
status: http.StatusRequestedRangeNotSatisfiable,
expectedErrMsg: ErrRangeRequestsNotSupported.Error(),
+ want: resource,
},
"not_found": {
url: "/some/resource",
status: http.StatusNotFound,
expectedErrMsg: ErrNotFound.Error(),
+ want: resource,
},
"invalid_url": {
url: "/%",
expectedErrMsg: "invalid URL escape",
+ want: resource,
},
}
@@ -86,7 +99,7 @@ func TestNewResource(t *testing.T) {
}
require.NoError(t, err)
- require.Contains(t, got.URL, tt.want.URL)
+ require.Contains(t, got.URL(), tt.want.URL())
require.Equal(t, tt.want.LastModified, got.LastModified)
require.Equal(t, tt.want.ETag, got.ETag)
require.Equal(t, tt.want.Size, got.Size)
diff --git a/internal/vfs/zip/archive.go b/internal/vfs/zip/archive.go
index df175764..1137f004 100644
--- a/internal/vfs/zip/archive.go
+++ b/internal/vfs/zip/archive.go
@@ -32,13 +32,21 @@ var (
errNotFile = errors.New("not a file")
)
+type archiveStatus int
+
+const (
+ archiveOpening archiveStatus = iota
+ archiveOpenError
+ archiveOpened
+ archiveCorrupted
+)
+
// zipArchive implements the vfs.Root interface.
// It represents a zip archive saving all its files in memory.
// It holds an httprange.Resource that can be read with httprange.RangedReader in chunks.
type zipArchive struct {
fs *zipVFS
- path string
once sync.Once
done chan struct{}
openTimeout time.Duration
@@ -54,10 +62,9 @@ type zipArchive struct {
directories map[string]*zip.FileHeader
}
-func newArchive(fs *zipVFS, path string, openTimeout time.Duration) *zipArchive {
+func newArchive(fs *zipVFS, openTimeout time.Duration) *zipArchive {
return &zipArchive{
fs: fs,
- path: path,
done: make(chan struct{}),
files: make(map[string]*zip.File),
directories: make(map[string]*zip.FileHeader),
@@ -66,9 +73,14 @@ func newArchive(fs *zipVFS, path string, openTimeout time.Duration) *zipArchive
}
}
-func (a *zipArchive) openArchive(parentCtx context.Context) (err error) {
+func (a *zipArchive) openArchive(parentCtx context.Context, url string) (err error) {
+ // always try to update URL on resource
+ if a.resource != nil {
+ a.resource.SetURL(url)
+ }
+
// return early if openArchive was done already in a concurrent request
- if ok, err := a.openStatus(); ok {
+ if status, err := a.openStatus(); status != archiveOpening {
return err
}
@@ -78,7 +90,7 @@ func (a *zipArchive) openArchive(parentCtx context.Context) (err error) {
a.once.Do(func() {
// read archive once in its own routine with its own timeout
// if parentCtx is canceled, readArchive will continue regardless and will be cached in memory
- go a.readArchive()
+ go a.readArchive(url)
})
// wait for readArchive to be done or return if the parent context is canceled
@@ -100,14 +112,14 @@ func (a *zipArchive) openArchive(parentCtx context.Context) (err error) {
// readArchive creates an httprange.Resource that can read the archive's contents and stores a slice of *zip.Files
// that can be accessed later when calling any of th vfs.VFS operations
-func (a *zipArchive) readArchive() {
+func (a *zipArchive) readArchive(url string) {
defer close(a.done)
// readArchive with a timeout separate from openArchive's
ctx, cancel := context.WithTimeout(context.Background(), a.openTimeout)
defer cancel()
- a.resource, a.err = httprange.NewResource(ctx, a.path)
+ a.resource, a.err = httprange.NewResource(ctx, url)
if a.err != nil {
metrics.ZipOpened.WithLabelValues("error").Inc()
return
@@ -281,12 +293,20 @@ func (a *zipArchive) onEvicted() {
metrics.ZipArchiveEntriesCached.Sub(float64(len(a.files)))
}
-func (a *zipArchive) openStatus() (bool, error) {
+func (a *zipArchive) openStatus() (archiveStatus, error) {
select {
case <-a.done:
- return true, a.err
+ if a.err != nil {
+ return archiveOpenError, a.err
+ }
+
+ if a.resource != nil && a.resource.Err() != nil {
+ return archiveCorrupted, a.resource.Err()
+ }
+
+ return archiveOpened, nil
default:
- return false, nil
+ return archiveOpening, nil
}
}
diff --git a/internal/vfs/zip/archive_test.go b/internal/vfs/zip/archive_test.go
index 2474d419..e1b0d116 100644
--- a/internal/vfs/zip/archive_test.go
+++ b/internal/vfs/zip/archive_test.go
@@ -72,30 +72,107 @@ func TestOpen(t *testing.T) {
func TestOpenCached(t *testing.T) {
var requests int64
- zip, cleanup := openZipArchive(t, &requests)
+ testServerURL, cleanup := newZipFileServerURL(t, "group/zip.gitlab.io/public-without-dirs.zip", &requests)
defer cleanup()
- t.Run("open file first time", func(t *testing.T) {
- requestsStart := requests
- f, err := zip.Open(context.Background(), "index.html")
- require.NoError(t, err)
- defer f.Close()
+ fs := New()
+
+ // We use array instead of map to ensure
+ // predictable ordering of test execution
+ tests := []struct {
+ name string
+ vfsPath string
+ filePath string
+ expectedArchiveStatus archiveStatus
+ expectedOpenErr error
+ expectedReadErr error
+ expectedRequests int64
+ }{
+ {
+ name: "open file first time",
+ vfsPath: testServerURL + "/public.zip",
+ filePath: "index.html",
+ // we expect five requests to:
+ // read resource and zip metadata
+ // read file: data offset and content
+ expectedRequests: 5,
+ expectedArchiveStatus: archiveOpened,
+ },
+ {
+ name: "open file second time",
+ vfsPath: testServerURL + "/public.zip",
+ filePath: "index.html",
+ // we expect one request to read file with cached data offset
+ expectedRequests: 1,
+ expectedArchiveStatus: archiveOpened,
+ },
+ {
+ name: "when the URL changes",
+ vfsPath: testServerURL + "/public.zip?new-secret",
+ filePath: "index.html",
+ expectedRequests: 1,
+ expectedArchiveStatus: archiveOpened,
+ },
+ {
+ name: "when opening cached file and content changes",
+ vfsPath: testServerURL + "/public.zip?changed-content=1",
+ filePath: "index.html",
+ expectedRequests: 1,
+ // we receive an error on `read` as `open` offset is already cached
+ expectedReadErr: httprange.ErrRangeRequestsNotSupported,
+ expectedArchiveStatus: archiveCorrupted,
+ },
+ {
+ name: "after content change archive is reloaded",
+ vfsPath: testServerURL + "/public.zip?new-secret",
+ filePath: "index.html",
+ expectedRequests: 5,
+ expectedArchiveStatus: archiveOpened,
+ },
+ {
+ name: "when opening non-cached file and content changes",
+ vfsPath: testServerURL + "/public.zip?changed-content=1",
+ filePath: "subdir/hello.html",
+ expectedRequests: 1,
+ // we receive an error on `read` as `open` offset is already cached
+ expectedOpenErr: httprange.ErrRangeRequestsNotSupported,
+ expectedArchiveStatus: archiveCorrupted,
+ },
+ }
- _, err = ioutil.ReadAll(f)
- require.NoError(t, err)
- require.Equal(t, int64(2), atomic.LoadInt64(&requests)-requestsStart, "we expect two requests to read file: data offset and content")
- })
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ start := atomic.LoadInt64(&requests)
+ zip, err := fs.Root(context.Background(), test.vfsPath)
+ require.NoError(t, err)
- t.Run("open file second time", func(t *testing.T) {
- requestsStart := atomic.LoadInt64(&requests)
- f, err := zip.Open(context.Background(), "index.html")
- require.NoError(t, err)
- defer f.Close()
+ f, err := zip.Open(context.Background(), test.filePath)
+ if test.expectedOpenErr != nil {
+ require.Equal(t, test.expectedOpenErr, err)
+ status, _ := zip.(*zipArchive).openStatus()
+ require.Equal(t, test.expectedArchiveStatus, status)
+ return
+ }
- _, err = ioutil.ReadAll(f)
- require.NoError(t, err)
- require.Equal(t, int64(1), atomic.LoadInt64(&requests)-requestsStart, "we expect one request to read file with cached data offset")
- })
+ require.NoError(t, err)
+ defer f.Close()
+
+ _, err = ioutil.ReadAll(f)
+ if test.expectedReadErr != nil {
+ require.Equal(t, test.expectedReadErr, err)
+ status, _ := zip.(*zipArchive).openStatus()
+ require.Equal(t, test.expectedArchiveStatus, status)
+ return
+ }
+
+ require.NoError(t, err)
+ status, _ := zip.(*zipArchive).openStatus()
+ require.Equal(t, test.expectedArchiveStatus, status)
+
+ end := atomic.LoadInt64(&requests)
+ require.Equal(t, test.expectedRequests, end-start)
+ })
+ }
}
func TestLstat(t *testing.T) {
@@ -246,11 +323,11 @@ func TestArchiveCanBeReadAfterOpenCtxCanceled(t *testing.T) {
defer cleanup()
fs := New().(*zipVFS)
- zip := newArchive(fs, testServerURL+"/public.zip", time.Second)
+ zip := newArchive(fs, time.Second)
ctx, cancel := context.WithCancel(context.Background())
cancel()
- err := zip.openArchive(ctx)
+ err := zip.openArchive(ctx, testServerURL+"/public.zip")
require.EqualError(t, err, context.Canceled.Error())
<-zip.done
@@ -269,9 +346,9 @@ func TestReadArchiveFails(t *testing.T) {
defer cleanup()
fs := New().(*zipVFS)
- zip := newArchive(fs, testServerURL+"/unkown.html", time.Second)
+ zip := newArchive(fs, time.Second)
- err := zip.openArchive(context.Background())
+ err := zip.openArchive(context.Background(), testServerURL+"/unkown.html")
require.Error(t, err)
require.Contains(t, err.Error(), httprange.ErrNotFound.Error())
@@ -289,9 +366,9 @@ func openZipArchive(t *testing.T, requests *int64) (*zipArchive, func()) {
testServerURL, cleanup := newZipFileServerURL(t, "group/zip.gitlab.io/public-without-dirs.zip", requests)
fs := New().(*zipVFS)
- zip := newArchive(fs, testServerURL+"/public.zip", time.Second)
+ zip := newArchive(fs, time.Second)
- err := zip.openArchive(context.Background())
+ err := zip.openArchive(context.Background(), testServerURL+"/public.zip")
require.NoError(t, err)
// public/ public/index.html public/404.html public/symlink.html
@@ -312,10 +389,18 @@ func newZipFileServerURL(t *testing.T, zipFilePath string, requests *int64) (str
m := http.NewServeMux()
m.HandleFunc("/public.zip", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- http.ServeFile(w, r, zipFilePath)
if requests != nil {
atomic.AddInt64(requests, 1)
}
+
+ r.ParseForm()
+
+ if changedContent := r.Form.Get("changed-content"); changedContent != "" {
+ w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+
+ http.ServeFile(w, r, zipFilePath)
}))
testServer := httptest.NewServer(m)
diff --git a/internal/vfs/zip/vfs.go b/internal/vfs/zip/vfs.go
index 3b69d1e9..692a4a69 100644
--- a/internal/vfs/zip/vfs.go
+++ b/internal/vfs/zip/vfs.go
@@ -110,6 +110,20 @@ func New(options ...Option) vfs.VFS {
return zipVFS
}
+func (fs *zipVFS) keyFromPath(path string) (string, error) {
+ // We assume that our URL is https://.../artifacts.zip?content-sign=aaa
+ // our caching key is `https://.../artifacts.zip`
+ // TODO: replace caching key with file_sha256
+ // https://gitlab.com/gitlab-org/gitlab-pages/-/issues/489
+ key, err := url.Parse(path)
+ if err != nil {
+ return "", err
+ }
+ key.RawQuery = ""
+ key.Fragment = ""
+ return key.String(), nil
+}
+
// Root opens an archive given a URL path and returns an instance of zipArchive
// that implements the vfs.VFS interface.
// To avoid using locks, the findOrOpenArchive function runs inside of a for
@@ -118,14 +132,14 @@ func New(options ...Option) vfs.VFS {
// to try and find the cached archive or return if there's an error, for example
// if the context is canceled.
func (fs *zipVFS) Root(ctx context.Context, path string) (vfs.Root, error) {
- urlPath, err := url.Parse(path)
+ key, err := fs.keyFromPath(path)
if err != nil {
return nil, err
}
// we do it in loop to not use any additional locks
for {
- root, err := fs.findOrOpenArchive(ctx, urlPath.String())
+ root, err := fs.findOrOpenArchive(ctx, key, path)
if err == errAlreadyCached {
continue
}
@@ -147,34 +161,53 @@ func (fs *zipVFS) Name() string {
// otherwise creates the archive entry in a cache and try to save it,
// if saving fails it's because the archive has already been cached
// (e.g. by another concurrent request)
-func (fs *zipVFS) findOrCreateArchive(ctx context.Context, path string) (*zipArchive, error) {
+func (fs *zipVFS) findOrCreateArchive(ctx context.Context, key string) (*zipArchive, error) {
// This needs to happen in lock to ensure that
// concurrent access will not remove it
// it is needed due to the bug https://github.com/patrickmn/go-cache/issues/48
fs.cacheLock.Lock()
defer fs.cacheLock.Unlock()
- archive, expiry, found := fs.cache.GetWithExpiration(path)
+ archive, expiry, found := fs.cache.GetWithExpiration(key)
if found {
- metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc()
+ status, _ := archive.(*zipArchive).openStatus()
+ switch status {
+ case archiveOpening:
+ metrics.ZipCacheRequests.WithLabelValues("archive", "hit-opening").Inc()
- if opened, err := archive.(*zipArchive).openStatus(); opened && err == nil {
+ case archiveOpenError:
+ // this means that archive is likely corrupted
+ // we keep it for duration of cache entry expiry (negative cache)
+ metrics.ZipCacheRequests.WithLabelValues("archive", "hit-open-error").Inc()
+
+ case archiveOpened:
if time.Until(expiry) < fs.cacheRefreshInterval {
- // refresh item that has been opened successfully
- fs.cache.SetDefault(path, archive)
+ fs.cache.SetDefault(key, archive)
+ metrics.ZipCacheRequests.WithLabelValues("archive", "hit-refresh").Inc()
+ } else {
+ metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc()
}
+
+ case archiveCorrupted:
+ // this means that archive is likely changed
+ // we should invalidate it immediately
+ metrics.ZipCacheRequests.WithLabelValues("archive", "corrupted").Inc()
+ archive = nil
}
- } else {
- archive = newArchive(fs, path, fs.openTimeout)
+ }
+
+ if archive == nil {
+ archive = newArchive(fs, fs.openTimeout)
// We call delete to ensure that expired item
// is properly evicted as there's a bug in a cache library:
// https://github.com/patrickmn/go-cache/issues/48
- fs.cache.Delete(path)
+ fs.cache.Delete(key)
// if adding the archive to the cache fails it means it's already been added before
// this is done to find concurrent additions.
- if fs.cache.Add(path, archive, fs.cacheExpirationInterval) != nil {
+ if fs.cache.Add(key, archive, fs.cacheExpirationInterval) != nil {
+ metrics.ZipCacheRequests.WithLabelValues("archive", "already-cached").Inc()
return nil, errAlreadyCached
}
@@ -186,13 +219,13 @@ func (fs *zipVFS) findOrCreateArchive(ctx context.Context, path string) (*zipArc
}
// findOrOpenArchive gets archive from cache and tries to open it
-func (fs *zipVFS) findOrOpenArchive(ctx context.Context, path string) (*zipArchive, error) {
- zipArchive, err := fs.findOrCreateArchive(ctx, path)
+func (fs *zipVFS) findOrOpenArchive(ctx context.Context, key, path string) (*zipArchive, error) {
+ zipArchive, err := fs.findOrCreateArchive(ctx, key)
if err != nil {
return nil, err
}
- err = zipArchive.openArchive(ctx)
+ err = zipArchive.openArchive(ctx, path)
if err != nil {
return nil, err
}
diff --git a/internal/vfs/zip/vfs_test.go b/internal/vfs/zip/vfs_test.go
index 8a5e77a8..dff2ff43 100644
--- a/internal/vfs/zip/vfs_test.go
+++ b/internal/vfs/zip/vfs_test.go
@@ -96,7 +96,7 @@ func TestVFSFindOrOpenArchiveConcurrentAccess(t *testing.T) {
}()
require.Eventually(t, func() bool {
- _, err := vfs.findOrOpenArchive(context.Background(), path)
+ _, err := vfs.findOrOpenArchive(context.Background(), path, path)
return err == errAlreadyCached
}, time.Second, time.Nanosecond)
}
@@ -166,7 +166,7 @@ func TestVFSFindOrOpenArchiveRefresh(t *testing.T) {
path := testServerURL + test.path
// create a new archive and increase counters
- archive1, err1 := vfs.findOrOpenArchive(context.Background(), path)
+ archive1, err1 := vfs.findOrOpenArchive(context.Background(), path, path)
if test.expectOpenError {
require.Error(t, err1)
require.Nil(t, archive1)
@@ -182,7 +182,7 @@ func TestVFSFindOrOpenArchiveRefresh(t *testing.T) {
if test.expectNewArchive {
// should return a new archive
- archive2, err2 := vfs.findOrOpenArchive(context.Background(), path)
+ archive2, err2 := vfs.findOrOpenArchive(context.Background(), path, path)
if test.expectOpenError {
require.Error(t, err2)
require.Nil(t, archive2)
@@ -194,7 +194,7 @@ func TestVFSFindOrOpenArchiveRefresh(t *testing.T) {
}
// should return exactly the same archive
- archive2, err2 := vfs.findOrOpenArchive(context.Background(), path)
+ archive2, err2 := vfs.findOrOpenArchive(context.Background(), path, path)
require.Equal(t, archive1, archive2, "same archive is returned")
require.Equal(t, err1, err2, "same error for the same archive")