diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-03-20 13:56:39 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-03-20 13:56:39 +0300 |
commit | 1017fbf2f98b23dd93d86173015159fd31df64d0 (patch) | |
tree | d0c80889f8fd686b3a60b909db93d7e1c6a14695 | |
parent | f6142399768f6eb6eae011373f9bcfef07ab8119 (diff) | |
parent | c48293dcac05a1b3ec83f9b1fbb00a1ca792ab68 (diff) |
Merge branch 'jv-cache-suppressor' into 'master'wc/testhelper-dial-timeout
Add MinOccurrences cache middleware
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5501
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Jacob Vosmaer <jacob@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Justin Tobler <jtobler@gitlab.com>
Co-authored-by: Jacob Vosmaer <jacob@gitlab.com>
-rw-r--r-- | internal/gitaly/config/config.go | 13 | ||||
-rw-r--r-- | internal/streamcache/cache.go | 7 | ||||
-rw-r--r-- | internal/streamcache/cache_test.go | 25 | ||||
-rw-r--r-- | internal/streamcache/min_occurences.go | 65 | ||||
-rw-r--r-- | internal/streamcache/min_occurrences_test.go | 140 |
5 files changed, 241 insertions, 9 deletions
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 222081d01..db46d4b32 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -9,6 +9,7 @@ import ( "path/filepath" "reflect" "regexp" + "strconv" "strings" "syscall" "time" @@ -262,9 +263,10 @@ type PackObjectsLimiting struct { // StreamCacheConfig contains settings for a streamcache instance. type StreamCacheConfig struct { - Enabled bool `toml:"enabled"` // Default: false - Dir string `toml:"dir"` // Default: <FIRST STORAGE PATH>/+gitaly/PackObjectsCache - MaxAge duration.Duration `toml:"max_age"` // Default: 5m + Enabled bool `toml:"enabled"` // Default: false + Dir string `toml:"dir"` // Default: <FIRST STORAGE PATH>/+gitaly/PackObjectsCache + MaxAge duration.Duration `toml:"max_age"` // Default: 5m + MinOccurrences int `toml:"min_occurrences"` } // Load initializes the Config variable from file and the environment. @@ -614,6 +616,11 @@ func (cfg *Cfg) configurePackObjectsCache() error { return errPackObjectsCacheRelativePath } + // Temporary environment variable for validation on GitLab.com + if n, err := strconv.Atoi(os.Getenv("GITALY_PACK_OBJECTS_CACHE_MIN_OCCURRENCES")); err == nil && n > 0 { + poc.MinOccurrences = n + } + return nil } diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go index 99459c806..326d7ebcb 100644 --- a/internal/streamcache/cache.go +++ b/internal/streamcache/cache.go @@ -149,7 +149,12 @@ func New(cfg config.StreamCacheConfig, logger logrus.FieldLogger) Cache { strconv.Itoa(int(cfg.MaxAge.Duration().Seconds())), ).Set(1) - return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.After, time.After, logger) + maxAge := cfg.MaxAge.Duration() + return &minOccurrences{ + N: cfg.MinOccurrences, + MinAge: maxAge, + Cache: newCacheWithSleep(cfg.Dir, maxAge, time.After, time.After, logger), + } } return NullCache{} diff --git a/internal/streamcache/cache_test.go b/internal/streamcache/cache_test.go index 8d0d0dc82..9e375b1d5 100644 --- a/internal/streamcache/cache_test.go +++ b/internal/streamcache/cache_test.go @@ -30,6 +30,21 @@ func newCache(dir string) Cache { }, log.Default()) } +func innerCache(c Cache) *cache { + for { + switch v := c.(type) { + case *cache: + return v + case *minOccurrences: + c = v.Cache + case *TestLoggingCache: + c = v.Cache + default: + panic(fmt.Errorf("unexpected cache type: %v", v)) + } + } +} + func TestCache_writeOneReadMultiple(t *testing.T) { ctx := testhelper.Context(t) @@ -127,7 +142,7 @@ func requireCacheFiles(t *testing.T, dir string, n int) { func requireCacheEntries(t *testing.T, _c Cache, n int) { t.Helper() - c := _c.(*cache) + c := innerCache(_c) c.m.Lock() defer c.m.Unlock() require.Len(t, c.index, n) @@ -332,7 +347,7 @@ func TestCache_failCreateFile(t *testing.T) { defer c.Stop() createError := errors.New("cannot create file") - c.(*cache).createFile = func() (namedWriteCloser, error) { return nil, createError } + innerCache(c).createFile = func() (namedWriteCloser, error) { return nil, createError } _, _, err := c.Fetch(ctx, "key", io.Discard, func(io.Writer) error { return nil }) require.Equal(t, createError, err) @@ -346,7 +361,7 @@ func TestCache_unWriteableFile(t *testing.T) { c := newCache(tmp) defer c.Stop() - c.(*cache).createFile = func() (namedWriteCloser, error) { + innerCache(c).createFile = func() (namedWriteCloser, error) { return os.OpenFile(filepath.Join(tmp, "unwriteable"), os.O_RDONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile) } @@ -366,7 +381,7 @@ func TestCache_unCloseableFile(t *testing.T) { c := newCache(tmp) defer c.Stop() - c.(*cache).createFile = func() (namedWriteCloser, error) { + innerCache(c).createFile = func() (namedWriteCloser, error) { f, err := os.OpenFile(filepath.Join(tmp, "uncloseable"), os.O_WRONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile) if err != nil { return nil, err @@ -387,7 +402,7 @@ func TestCache_cannotOpenFileForReading(t *testing.T) { c := newCache(tmp) defer c.Stop() - c.(*cache).createFile = func() (namedWriteCloser, error) { + innerCache(c).createFile = func() (namedWriteCloser, error) { f, err := os.OpenFile(filepath.Join(tmp, "unopenable"), os.O_WRONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile) if err != nil { return nil, err diff --git a/internal/streamcache/min_occurences.go b/internal/streamcache/min_occurences.go new file mode 100644 index 000000000..d48e37e95 --- /dev/null +++ b/internal/streamcache/min_occurences.go @@ -0,0 +1,65 @@ +package streamcache + +import ( + "context" + "io" + "sync" + "time" +) + +// minOccurrences is a streamcache middleware. Its intended use is to +// prevent storing unique keys in the cache. If a key is retrieved only +// once, the cache is not helping, and it costs us disk IO to store an +// entry in the cache. By keeping a counter per key minOccurrences can +// prevent keys from reaching the "real" cache until their count is high +// enough. +type minOccurrences struct { + Cache + N int // Minimum occurrences before a key passes to real Cache + MinAge time.Duration // Minimum time to remember a key + + // We must garbage-collect our counters periodically to prevent unbounded + // memory growth. To efficiently drop many counters we drop an entire + // map. To prevent resetting all counters and creating a wave of cache + // misses, use an "old" and "new" generation of counters. Frequently used + // keys will migrate from "old" to "new" and thereby will not get reset. + m sync.Mutex + oldCount map[string]int + newCount map[string]int + rotatedAt time.Time + + null NullCache +} + +func (mo *minOccurrences) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) { + if mo.incr(key) > mo.N { + return mo.Cache.Fetch(ctx, key, dst, create) + } + return mo.null.Fetch(ctx, key, dst, create) +} + +func (mo *minOccurrences) incr(key string) int { + mo.m.Lock() + defer mo.m.Unlock() + + // Keys live longer than MinAge and that is OK. We just need to make sure + // they get garbage-collected eventually, and that they live at least as + // long as the underlying cache expiry time. + if now := time.Now(); now.Sub(mo.rotatedAt) > mo.MinAge { + mo.rotatedAt = now + mo.oldCount = mo.newCount // This causes mo.oldCount to get garbage-collected + mo.newCount = nil + } + + if mo.newCount == nil { + mo.newCount = make(map[string]int, len(mo.oldCount)) + } + + mo.newCount[key]++ + if old := mo.oldCount[key]; old > 0 { + mo.newCount[key] += old + delete(mo.oldCount, key) + } + + return mo.newCount[key] +} diff --git a/internal/streamcache/min_occurrences_test.go b/internal/streamcache/min_occurrences_test.go new file mode 100644 index 000000000..fc09aa3c7 --- /dev/null +++ b/internal/streamcache/min_occurrences_test.go @@ -0,0 +1,140 @@ +package streamcache + +import ( + "bytes" + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func testNullCache() *TestLoggingCache { return &TestLoggingCache{Cache: NullCache{}} } + +func TestMinOccurrences_suppress(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: 5 * time.Minute, + Cache: tc, + } + defer c.Stop() + + for i := 0; i < 10; i++ { + output := &bytes.Buffer{} + _, _, err := c.Fetch(ctx, "key", output, writeString("hello")) + require.NoError(t, err) + require.Equal(t, "hello", output.String()) + + if i < c.N { + require.Empty(t, tc.Entries(), "expect first call not to have reached underlying cache") + } else { + require.Len(t, tc.Entries(), 1+i-c.N, "call should reach underlying cache") + } + } +} + +func TestMinOccurrences_manyDifferentCalls(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: 5 * time.Minute, + Cache: tc, + } + defer c.Stop() + + const calls = 1000 + fetchErrors := make(chan error, calls) + start := make(chan struct{}) + for i := 0; i < calls; i++ { + go func(i int) { + <-start + _, _, err := c.Fetch(ctx, fmt.Sprintf("key.%d", i), io.Discard, writeString(fmt.Sprintf("hello.%d", i))) + fetchErrors <- err + }(i) + } + + close(start) + for i := 0; i < calls; i++ { + require.NoError(t, <-fetchErrors) + } + + require.Empty(t, tc.Entries(), "because every key was unique, no calls should have reached underlying cache") +} + +type stopCache struct { + NullCache + stopped int +} + +func (sc *stopCache) Stop() { sc.stopped++ } + +func TestMinOccurrences_propagateStop(t *testing.T) { + sc := &stopCache{} + c := minOccurrences{Cache: sc} + require.Equal(t, 0, sc.stopped) + + c.Stop() + require.Equal(t, 1, sc.stopped) +} + +func (mo *minOccurrences) size() int { + mo.m.Lock() + defer mo.m.Unlock() + return len(mo.oldCount) + len(mo.newCount) +} + +func TestMinOccurrences_forgetOldKeys(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: time.Hour, + Cache: tc, + } + defer c.Stop() + + require.Equal(t, 0, c.size()) + + const calls = 1000 + for i := 0; i < calls; i++ { + _, _, err := c.Fetch(ctx, fmt.Sprintf("old key.%d", i), io.Discard, writeString("hello")) + require.NoError(t, err) + } + require.Equal(t, calls, c.size(), "old keys") + + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello")) + require.Equal(t, calls+1, c.size(), "old keys and new key") + + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello")) + require.Equal(t, 1, c.size(), "only new key") +} + +func TestMinOccurrences_keySurvivesRotation(t *testing.T) { + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: time.Hour, + Cache: tc, + } + defer c.Stop() + + for i := 0; i < 1000; i++ { + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + require.Equal(t, i+1, c.incr("frequent key"), "frequently occurring key is remembered and incremented") + + // Use i%3 because it takes 2 rotations for a key to be forgotten + if i%3 == 0 { + require.Equal(t, 1, c.incr("infrequent key"), "infrequent key is forgotten and reset") + } + } +} |