Welcome to mirror list, hosted at ThFree Co, Russian Federation.

cache.go « streamcache « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: ad365c2f69e0b16b071671356836088b9b6cdd4f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
// Package streamcache provides a cache for large blobs (in the order of
// gigabytes). Because storing gigabytes of data is slow, cache entries
// can be streamed on the read end before they have finished on the write
// end. Because storing gigabytes of data is expensive, cache entries
// have a back pressure mechanism: if the readers don't make progress
// reading the data, the writers will block. That way our disk can fill
// up no faster than our readers can read from the cache.
//
// The cache has 3 main parts: Cache (in-memory index), filestore (files
// to store the cached data in because it does not fit in memory), and
// pipe (coordinated IO to one file between one writer and multiple
// readers). A cache entry consists of a key, an maximum age, a
// pipe and the error result of the thing writing to the pipe.
//
// # Eviction
//
// There are two eviction goroutines: one for Cache and one for filestore.
// The Cache eviction goroutine evicts entries after a set amount of time,
// and deletes their underlying files too. This is safe because Unix file
// semantics guarantee that readers/writers that are still using those
// files can keep using them. In addition to evicting known cache
// entries, we also have a goroutine at the filestore level which
// performs a directory walk. This will clean up cache files left behind
// by other processes.
package streamcache

import (
	"context"
	"fmt"
	"io"
	"os"
	"strconv"
	"sync"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
	"gitlab.com/gitlab-org/gitaly/v16/internal/dontpanic"
	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
	"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v16/internal/log"
)

var (
	cacheIndexSize = promauto.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "gitaly_streamcache_index_entries",
			Help: "Number of index entries in streamcache",
		},
		[]string{"dir"},
	)

	packObjectsCacheEnabled = promauto.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "gitaly_pack_objects_cache_enabled",
			Help: "If set to 1, indicates that the cache for PackObjectsHook has been enabled in this process",
		},
		[]string{"dir", "max_age"},
	)
)

// Cache is a cache for large byte streams.
type Cache interface {
	// Fetch finds or creates a cache entry and writes its contents into dst.
	// If the create callback is called the created return value is true. In
	// case of a non-nil error return, the create callback may still be
	// running in a goroutine for the benefit of another caller of Fetch with
	// the same key.
	Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error)
	// Stop stops the cleanup goroutines of the cache.
	Stop()
}

var _ = Cache(&TestLoggingCache{})

// TestLogEntry records the result of a cache lookup for testing purposes.
type TestLogEntry struct {
	Key     string
	Created bool
	Err     error
}

// TestLoggingCache wraps a real Cache and logs all its lookups. This is
// not suitable for production because the log will grow indefinitely.
// Use only for testing.
type TestLoggingCache struct {
	Cache
	entries []*TestLogEntry
	m       sync.Mutex
}

// Fetch calls the underlying Fetch method and logs the
// result.
func (tlc *TestLoggingCache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) {
	written, created, err = tlc.Cache.Fetch(ctx, key, dst, create)

	tlc.m.Lock()
	defer tlc.m.Unlock()
	tlc.entries = append(tlc.entries, &TestLogEntry{Key: key, Created: created, Err: err})
	return written, created, err
}

// Entries returns a reference to the log of entries observed so far.
// This is a reference so the caller should not modify the underlying
// array or its elements.
func (tlc *TestLoggingCache) Entries() []*TestLogEntry {
	tlc.m.Lock()
	defer tlc.m.Unlock()
	return tlc.entries
}

var _ = Cache(NullCache{})

// NullCache is a null implementation of Cache. Every lookup is a miss,
// and it uses no storage.
type NullCache struct{}

// Fetch runs create(dst). The created flag is always true.
func (NullCache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) {
	w := &helper.CountingWriter{W: dst}
	err = create(w)
	return w.N, true, err
}

// Stop is a no-op.
func (NullCache) Stop() {}

type cache struct {
	m          sync.Mutex
	maxAge     time.Duration
	index      map[string]*entry
	createFile func() (namedWriteCloser, error)
	stop       chan struct{}
	stopOnce   sync.Once
	logger     log.Logger
	dir        string
	sleepLoop  *dontpanic.Forever

	// removalCond is a condition that gets signalled after files have been removed from disk.
	// This field is optional and should only be used for tests.
	removalCond *sync.Cond
}

// New returns a new cache instance.
func New(cfg config.StreamCacheConfig, logger log.Logger) Cache {
	if cfg.Enabled {
		packObjectsCacheEnabled.WithLabelValues(
			cfg.Dir,
			strconv.Itoa(int(cfg.MaxAge.Duration().Seconds())),
		).Set(1)

		maxAge := cfg.MaxAge.Duration()
		return &minOccurrences{
			N:      cfg.MinOccurrences,
			MinAge: maxAge,
			Cache:  newCacheWithSleep(cfg.Dir, maxAge, time.After, time.After, logger),
		}
	}

	return NullCache{}
}

func newCacheWithSleep(
	dir string,
	maxAge time.Duration,
	filestoreSleep func(time.Duration) <-chan time.Time,
	cleanSleep func(time.Duration) <-chan time.Time,
	logger log.Logger,
) *cache {
	fs := newFilestore(dir, maxAge, filestoreSleep, logger)

	c := &cache{
		maxAge:     maxAge,
		index:      make(map[string]*entry),
		createFile: fs.Create,
		stop:       make(chan struct{}),
		logger:     logger,
		dir:        dir,
		sleepLoop:  dontpanic.NewForever(logger, time.Minute),
	}

	c.sleepLoop.Go(func() {
		sleepLoop(c.stop, c.maxAge, cleanSleep, c.clean)
	})
	go func() {
		<-c.stop
		c.sleepLoop.Cancel()
		fs.Stop()
	}()

	return c
}

func (c *cache) Stop() {
	c.stopOnce.Do(func() { close(c.stop) })
}

func (c *cache) clean() {
	c.m.Lock()
	defer c.m.Unlock()

	var removed []*entry
	cutoff := time.Now().Add(-c.maxAge)
	for k, e := range c.index {
		if e.created.Before(cutoff) {
			c.delete(k)
			removed = append(removed, e)
		}
	}

	// Batch together file removals in a goroutine, without holding the mutex
	go func() {
		for _, e := range removed {
			if err := e.pipe.RemoveFile(); err != nil && !os.IsNotExist(err) {
				c.logger.WithError(err).Error("streamcache: remove file evicted from index")
			}
		}

		if c.removalCond != nil {
			c.removalCond.L.Lock()
			defer c.removalCond.L.Unlock()
			c.removalCond.Broadcast()
		}
	}()
}

func (c *cache) delete(key string) {
	delete(c.index, key)
	c.setIndexSize()
}

func (c *cache) setIndexSize() {
	cacheIndexSize.WithLabelValues(c.dir).Set(float64(len(c.index)))
}

func (c *cache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) {
	var (
		rc io.ReadCloser
		wt *waiter
	)
	rc, wt, created, err = c.getStream(key, create)
	if err != nil {
		return
	}
	defer rc.Close()

	written, err = io.Copy(dst, rc)
	if err != nil {
		return
	}

	err = wt.Wait(ctx)
	return
}

func (c *cache) getStream(key string, create func(io.Writer) error) (_ io.ReadCloser, _ *waiter, created bool, err error) {
	c.m.Lock()
	defer c.m.Unlock()

	if e := c.index[key]; e != nil {
		if r, err := e.pipe.OpenReader(); err == nil {
			return r, e.waiter, false, nil
		}

		// In this case err != nil. That is allowed to happen, for instance if
		// the *filestore cleanup goroutine deleted the file already. But let's
		// remove the key from the cache to save the next caller the effort of
		// trying to open this entry.
		c.delete(key)
	}

	r, e, err := c.newEntry(key, create)
	if err != nil {
		return nil, nil, false, err
	}

	c.index[key] = e
	c.setIndexSize()

	return r, e.waiter, true, nil
}

type entry struct {
	key     string
	cache   *cache
	pipe    *pipe
	created time.Time
	waiter  *waiter
}

func (c *cache) newEntry(key string, create func(io.Writer) error) (_ io.ReadCloser, _ *entry, err error) {
	e := &entry{
		key:     key,
		cache:   c,
		created: time.Now(),
		waiter:  newWaiter(),
	}

	// Every entry gets a unique underlying file. We do not want to reuse
	// existing cache files because we do not know whether they are the
	// result of a succesfull call to create.
	//
	// This may sound like we should be using an anonymous tempfile, but that
	// would be at odds with the requirement to be able to open and close
	// multiple instances of the file independently: one for the writer, and
	// one for each reader.
	//
	// So the name of the file is irrelevant, but the file must have _a_
	// name.
	f, err := c.createFile()
	if err != nil {
		return nil, nil, err
	}
	defer func() {
		if err != nil {
			f.Close()
		}
	}()

	var pr io.ReadCloser
	pr, e.pipe, err = newPipe(f)
	if err != nil {
		return nil, nil, err
	}

	go func() {
		err := runCreate(e.pipe, create)

		// We defer this until after we have removed the cache entry so that the waiter is
		// only unblocked when the cache key has already been pruned from the cache.
		defer e.waiter.SetError(err)

		if err != nil {
			c.logger.WithError(err).Error("create cache entry")
			c.m.Lock()
			defer c.m.Unlock()
			c.delete(key)
		}
	}()

	return pr, e, nil
}

func runCreate(w io.WriteCloser, create func(io.Writer) error) (err error) {
	// Catch panics because this function runs in a goroutine. That means that
	// unlike RPC handlers, which are guarded by a panic catching middleware,
	// an uncaught panic can crash the whole process.
	defer func() {
		if p := recover(); p != nil {
			err = fmt.Errorf("panic: %v", p)
		}
	}()

	defer w.Close()

	if err := create(w); err != nil {
		return err
	}

	if err := w.Close(); err != nil {
		return err
	}

	return nil
}

type waiter struct {
	done chan struct{}
	err  error
	once sync.Once
}

func newWaiter() *waiter { return &waiter{done: make(chan struct{})} }

func (w *waiter) SetError(err error) {
	w.once.Do(func() {
		w.err = err
		close(w.done)
	})
}

func (w *waiter) Wait(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-w.done:
		return w.err
	}
}

func sleepLoop(done chan struct{}, period time.Duration, sleep func(time.Duration) <-chan time.Time, callback func()) {
	const maxPeriod = time.Minute
	if period <= 0 || period >= maxPeriod {
		period = maxPeriod
	}

	for {
		select {
		case <-done:
			return
		case <-sleep(period):
		}

		callback()
	}
}