Welcome to mirror list, hosted at ThFree Co, Russian Federation.

vfs.go « zip « vfs « internal - gitlab.com/gitlab-org/gitlab-pages.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: d2db160610dca2ff69bd542866884d594dce92f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package zip

import (
	"context"
	"errors"
	"io/fs"
	"net/http"
	"net/url"
	"sync"
	"time"

	"github.com/patrickmn/go-cache"

	"gitlab.com/gitlab-org/gitlab-pages/internal/config"
	"gitlab.com/gitlab-org/gitlab-pages/internal/httpfs"
	"gitlab.com/gitlab-org/gitlab-pages/internal/httprange"
	"gitlab.com/gitlab-org/gitlab-pages/internal/httptransport"
	"gitlab.com/gitlab-org/gitlab-pages/internal/lru"
	"gitlab.com/gitlab-org/gitlab-pages/internal/vfs"
	"gitlab.com/gitlab-org/gitlab-pages/metrics"
)

const (
	// we assume that each item costs around 100 bytes
	// this gives around 5MB of raw memory needed without acceleration structures
	defaultDataOffsetItems              = 50000
	defaultDataOffsetExpirationInterval = time.Hour

	// we assume that each item costs around 200 bytes
	// this gives around 2MB of raw memory needed without acceleration structures
	defaultReadlinkItems              = 10000
	defaultReadlinkExpirationInterval = time.Hour
)

var (
	errAlreadyCached = errors.New("archive already cached")
)

type lruCache interface {
	FindOrFetch(cacheNamespace, key string, fetchFn func() (interface{}, error)) (interface{}, error)
}

// zipVFS is a simple cached implementation of the vfs.VFS interface
type zipVFS struct {
	cache     *cache.Cache
	cacheLock sync.Mutex

	openTimeout             time.Duration
	cacheExpirationInterval time.Duration
	cacheRefreshInterval    time.Duration
	cacheCleanupInterval    time.Duration

	dataOffsetCache lruCache
	readlinkCache   lruCache

	// the `int64` needs to be 64bit aligned on some 32bit systems
	// https://gitlab.com/gitlab-org/gitlab/-/issues/337261
	archiveCount *int64
	httpClient   *http.Client
}

// New creates a zipVFS instance that can be used by a serving request
func New(cfg *config.ZipServing) vfs.VFS {
	zipVFS := &zipVFS{
		cacheExpirationInterval: cfg.ExpirationInterval,
		cacheRefreshInterval:    cfg.RefreshInterval,
		cacheCleanupInterval:    cfg.CleanupInterval,
		openTimeout:             cfg.OpenTimeout,
		httpClient: &http.Client{
			// TODO: make this timeout configurable
			// https://gitlab.com/gitlab-org/gitlab-pages/-/issues/457
			Timeout: 30 * time.Minute,
			Transport: httptransport.NewMeteredRoundTripper(
				httptransport.NewTransport(),
				"zip_vfs",
				metrics.HTTPRangeTraceDuration,
				metrics.HTTPRangeRequestDuration,
				metrics.HTTPRangeRequestsTotal,
				httptransport.DefaultTTFBTimeout,
			),
		},
		archiveCount: new(int64),
	}

	zipVFS.resetCache()

	// TODO: To be removed with https://gitlab.com/gitlab-org/gitlab-pages/-/issues/480
	zipVFS.dataOffsetCache = lru.New(
		"data-offset",
		lru.WithMaxSize(defaultDataOffsetItems),
		lru.WithExpirationInterval(defaultDataOffsetExpirationInterval),
		lru.WithCachedEntriesMetric(metrics.ZipCachedEntries),
		lru.WithCachedRequestsMetric(metrics.ZipCacheRequests),
	)
	zipVFS.readlinkCache = lru.New(
		"readlink",
		lru.WithMaxSize(defaultReadlinkItems),
		lru.WithExpirationInterval(defaultReadlinkExpirationInterval),
		lru.WithCachedEntriesMetric(metrics.ZipCachedEntries),
		lru.WithCachedRequestsMetric(metrics.ZipCacheRequests),
	)

	return zipVFS
}

// Reconfigure will update the zipVFS configuration values and will reset the
// cache
func (zfs *zipVFS) Reconfigure(cfg *config.Config) error {
	zfs.cacheLock.Lock()
	defer zfs.cacheLock.Unlock()

	zfs.openTimeout = cfg.Zip.OpenTimeout
	zfs.cacheExpirationInterval = cfg.Zip.ExpirationInterval
	zfs.cacheRefreshInterval = cfg.Zip.RefreshInterval
	zfs.cacheCleanupInterval = cfg.Zip.CleanupInterval

	if err := zfs.reconfigureTransport(cfg); err != nil {
		return err
	}

	zfs.resetCache()

	return nil
}

func (zfs *zipVFS) reconfigureTransport(cfg *config.Config) error {
	fsTransport, err := httpfs.NewFileSystemPath(cfg.Zip.AllowedPaths)
	if err != nil {
		return err
	}

	zfs.httpClient.Transport.(httptransport.Transport).
		RegisterProtocol("file", http.NewFileTransport(fsTransport))

	return nil
}

func (zfs *zipVFS) resetCache() {
	zfs.cache = cache.New(zfs.cacheExpirationInterval, zfs.cacheCleanupInterval)
	zfs.cache.OnEvicted(func(s string, i interface{}) {
		metrics.ZipCachedEntries.WithLabelValues("archive").Dec()

		i.(*zipArchive).onEvicted()
	})
}

func keyFromPath(path string) (string, error) {
	// We assume that our URL is https://.../artifacts.zip?content-sign=aaa
	// our caching key is `https://.../artifacts.zip`
	// TODO: replace caching key with file_sha256
	// https://gitlab.com/gitlab-org/gitlab-pages/-/issues/489
	key, err := url.Parse(path)
	if err != nil {
		return "", err
	}
	key.RawQuery = ""
	key.Fragment = ""
	return key.String(), nil
}

// Root opens an archive given a URL path and returns an instance of zipArchive
// that implements the vfs.VFS interface.
// To avoid using locks, the findOrOpenArchive function runs inside of a for
// loop until an archive is either found or created and saved.
// If findOrOpenArchive returns errAlreadyCached, the for loop will continue
// to try and find the cached archive or return if there's an error, for example
// if the context is canceled.
func (zfs *zipVFS) Root(ctx context.Context, path string, cacheKey string) (vfs.Root, error) {
	// TODO: update acceptance tests mocked response to return a cacheKey
	if cacheKey == "" {
		k, err := keyFromPath(path)
		if err != nil {
			return nil, err
		}

		cacheKey = k
	}

	// we do it in loop to not use any additional locks
	for {
		root, err := zfs.findOrOpenArchive(ctx, cacheKey, path)
		if errors.Is(err, errAlreadyCached) {
			continue
		}

		// If archive is not found, return a known `vfs` error
		if errors.Is(err, httprange.ErrNotFound) {
			return nil, fs.ErrNotExist
		}

		return root, err
	}
}

func (zfs *zipVFS) Name() string {
	return "zip"
}

// findOrCreateArchive if found in fs.cache refresh if needed and return it.
// otherwise creates the archive entry in a cache and try to save it,
// if saving fails it's because the archive has already been cached
// (e.g. by another concurrent request)
func (zfs *zipVFS) findOrCreateArchive(ctx context.Context, key string) (*zipArchive, error) {
	// This needs to happen in lock to ensure that
	// concurrent access will not remove it
	// it is needed due to the bug https://github.com/patrickmn/go-cache/issues/48
	zfs.cacheLock.Lock()
	defer zfs.cacheLock.Unlock()

	archive, expiry, found := zfs.cache.GetWithExpiration(key)
	if found {
		status, _ := archive.(*zipArchive).openStatus()
		switch status {
		case archiveOpening:
			metrics.ZipCacheRequests.WithLabelValues("archive", "hit-opening").Inc()

		case archiveOpenError:
			// this means that archive is likely corrupted
			// we keep it for duration of cache entry expiry (negative cache)
			metrics.ZipCacheRequests.WithLabelValues("archive", "hit-open-error").Inc()

		case archiveOpened:
			if time.Until(expiry) < zfs.cacheRefreshInterval {
				zfs.cache.SetDefault(key, archive)
				metrics.ZipCacheRequests.WithLabelValues("archive", "hit-refresh").Inc()
			} else {
				metrics.ZipCacheRequests.WithLabelValues("archive", "hit").Inc()
			}

		case archiveCorrupted:
			// this means that archive is likely changed
			// we should invalidate it immediately
			metrics.ZipCacheRequests.WithLabelValues("archive", "corrupted").Inc()
			archive = nil
		}
	}

	if archive == nil {
		archive = newArchive(zfs, zfs.openTimeout)

		// We call delete to ensure that expired item
		// is properly evicted as there's a bug in a cache library:
		// https://github.com/patrickmn/go-cache/issues/48
		zfs.cache.Delete(key)

		// if adding the archive to the cache fails it means it's already been added before
		// this is done to find concurrent additions.
		if zfs.cache.Add(key, archive, zfs.cacheExpirationInterval) != nil {
			metrics.ZipCacheRequests.WithLabelValues("archive", "already-cached").Inc()
			return nil, errAlreadyCached
		}

		metrics.ZipCacheRequests.WithLabelValues("archive", "miss").Inc()
		metrics.ZipCachedEntries.WithLabelValues("archive").Inc()
	}

	return archive.(*zipArchive), nil
}

// findOrOpenArchive gets archive from cache and tries to open it
func (zfs *zipVFS) findOrOpenArchive(ctx context.Context, key, path string) (*zipArchive, error) {
	zipArchive, err := zfs.findOrCreateArchive(ctx, key)
	if err != nil {
		return nil, err
	}

	err = zipArchive.openArchive(ctx, path)
	if err != nil {
		return nil, err
	}

	return zipArchive, nil
}