From df4204b3f33e1e9bde89852ff338edec3848ed32 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Tue, 14 Mar 2023 11:19:45 +0000 Subject: Add MinOccurrences cache middleware This adds a new streamcache middleware called MinOccurrences. Its purpose is to only forward requests to the inner cache for keys that have been seen a minimum number of times. --- internal/streamcache/min_occurences.go | 65 +++++++++++++ internal/streamcache/min_occurrences_test.go | 140 +++++++++++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 internal/streamcache/min_occurences.go create mode 100644 internal/streamcache/min_occurrences_test.go diff --git a/internal/streamcache/min_occurences.go b/internal/streamcache/min_occurences.go new file mode 100644 index 000000000..d48e37e95 --- /dev/null +++ b/internal/streamcache/min_occurences.go @@ -0,0 +1,65 @@ +package streamcache + +import ( + "context" + "io" + "sync" + "time" +) + +// minOccurrences is a streamcache middleware. Its intended use is to +// prevent storing unique keys in the cache. If a key is retrieved only +// once, the cache is not helping, and it costs us disk IO to store an +// entry in the cache. By keeping a counter per key minOccurrences can +// prevent keys from reaching the "real" cache until their count is high +// enough. +type minOccurrences struct { + Cache + N int // Minimum occurrences before a key passes to real Cache + MinAge time.Duration // Minimum time to remember a key + + // We must garbage-collect our counters periodically to prevent unbounded + // memory growth. To efficiently drop many counters we drop an entire + // map. To prevent resetting all counters and creating a wave of cache + // misses, use an "old" and "new" generation of counters. Frequently used + // keys will migrate from "old" to "new" and thereby will not get reset. + m sync.Mutex + oldCount map[string]int + newCount map[string]int + rotatedAt time.Time + + null NullCache +} + +func (mo *minOccurrences) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) { + if mo.incr(key) > mo.N { + return mo.Cache.Fetch(ctx, key, dst, create) + } + return mo.null.Fetch(ctx, key, dst, create) +} + +func (mo *minOccurrences) incr(key string) int { + mo.m.Lock() + defer mo.m.Unlock() + + // Keys live longer than MinAge and that is OK. We just need to make sure + // they get garbage-collected eventually, and that they live at least as + // long as the underlying cache expiry time. + if now := time.Now(); now.Sub(mo.rotatedAt) > mo.MinAge { + mo.rotatedAt = now + mo.oldCount = mo.newCount // This causes mo.oldCount to get garbage-collected + mo.newCount = nil + } + + if mo.newCount == nil { + mo.newCount = make(map[string]int, len(mo.oldCount)) + } + + mo.newCount[key]++ + if old := mo.oldCount[key]; old > 0 { + mo.newCount[key] += old + delete(mo.oldCount, key) + } + + return mo.newCount[key] +} diff --git a/internal/streamcache/min_occurrences_test.go b/internal/streamcache/min_occurrences_test.go new file mode 100644 index 000000000..fc09aa3c7 --- /dev/null +++ b/internal/streamcache/min_occurrences_test.go @@ -0,0 +1,140 @@ +package streamcache + +import ( + "bytes" + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func testNullCache() *TestLoggingCache { return &TestLoggingCache{Cache: NullCache{}} } + +func TestMinOccurrences_suppress(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: 5 * time.Minute, + Cache: tc, + } + defer c.Stop() + + for i := 0; i < 10; i++ { + output := &bytes.Buffer{} + _, _, err := c.Fetch(ctx, "key", output, writeString("hello")) + require.NoError(t, err) + require.Equal(t, "hello", output.String()) + + if i < c.N { + require.Empty(t, tc.Entries(), "expect first call not to have reached underlying cache") + } else { + require.Len(t, tc.Entries(), 1+i-c.N, "call should reach underlying cache") + } + } +} + +func TestMinOccurrences_manyDifferentCalls(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: 5 * time.Minute, + Cache: tc, + } + defer c.Stop() + + const calls = 1000 + fetchErrors := make(chan error, calls) + start := make(chan struct{}) + for i := 0; i < calls; i++ { + go func(i int) { + <-start + _, _, err := c.Fetch(ctx, fmt.Sprintf("key.%d", i), io.Discard, writeString(fmt.Sprintf("hello.%d", i))) + fetchErrors <- err + }(i) + } + + close(start) + for i := 0; i < calls; i++ { + require.NoError(t, <-fetchErrors) + } + + require.Empty(t, tc.Entries(), "because every key was unique, no calls should have reached underlying cache") +} + +type stopCache struct { + NullCache + stopped int +} + +func (sc *stopCache) Stop() { sc.stopped++ } + +func TestMinOccurrences_propagateStop(t *testing.T) { + sc := &stopCache{} + c := minOccurrences{Cache: sc} + require.Equal(t, 0, sc.stopped) + + c.Stop() + require.Equal(t, 1, sc.stopped) +} + +func (mo *minOccurrences) size() int { + mo.m.Lock() + defer mo.m.Unlock() + return len(mo.oldCount) + len(mo.newCount) +} + +func TestMinOccurrences_forgetOldKeys(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: time.Hour, + Cache: tc, + } + defer c.Stop() + + require.Equal(t, 0, c.size()) + + const calls = 1000 + for i := 0; i < calls; i++ { + _, _, err := c.Fetch(ctx, fmt.Sprintf("old key.%d", i), io.Discard, writeString("hello")) + require.NoError(t, err) + } + require.Equal(t, calls, c.size(), "old keys") + + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello")) + require.Equal(t, calls+1, c.size(), "old keys and new key") + + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello")) + require.Equal(t, 1, c.size(), "only new key") +} + +func TestMinOccurrences_keySurvivesRotation(t *testing.T) { + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: time.Hour, + Cache: tc, + } + defer c.Stop() + + for i := 0; i < 1000; i++ { + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + require.Equal(t, i+1, c.incr("frequent key"), "frequently occurring key is remembered and incremented") + + // Use i%3 because it takes 2 rotations for a key to be forgotten + if i%3 == 0 { + require.Equal(t, 1, c.incr("infrequent key"), "infrequent key is forgotten and reset") + } + } +} -- cgit v1.2.3 From c48293dcac05a1b3ec83f9b1fbb00a1ca792ab68 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 17 Mar 2023 21:27:49 +0000 Subject: Configure pack-objects min occurrences via env var Inject the MinOccurrences middleware into the streamcache call stack and make it configurable via the GITALY_PACK_OBJECTS_CACHE_MIN_OCCURRENCES environment variable. This is a temporary configuration mechanism that will allow us to validate the MinOccurrences middleware on GitLab.com quicker. If it works as expected we will replace the env var with normal config.toml configuration logic. --- internal/gitaly/config/config.go | 13 ++++++++++--- internal/streamcache/cache.go | 7 ++++++- internal/streamcache/cache_test.go | 25 ++++++++++++++++++++----- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index f99c82999..54885f7fe 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -9,6 +9,7 @@ import ( "path/filepath" "reflect" "regexp" + "strconv" "strings" "syscall" "time" @@ -262,9 +263,10 @@ type PackObjectsLimiting struct { // StreamCacheConfig contains settings for a streamcache instance. type StreamCacheConfig struct { - Enabled bool `toml:"enabled"` // Default: false - Dir string `toml:"dir"` // Default: /+gitaly/PackObjectsCache - MaxAge duration.Duration `toml:"max_age"` // Default: 5m + Enabled bool `toml:"enabled"` // Default: false + Dir string `toml:"dir"` // Default: /+gitaly/PackObjectsCache + MaxAge duration.Duration `toml:"max_age"` // Default: 5m + MinOccurrences int `toml:"min_occurrences"` } // Load initializes the Config variable from file and the environment. @@ -614,6 +616,11 @@ func (cfg *Cfg) configurePackObjectsCache() error { return errPackObjectsCacheRelativePath } + // Temporary environment variable for validation on GitLab.com + if n, err := strconv.Atoi(os.Getenv("GITALY_PACK_OBJECTS_CACHE_MIN_OCCURRENCES")); err == nil && n > 0 { + poc.MinOccurrences = n + } + return nil } diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go index 99459c806..326d7ebcb 100644 --- a/internal/streamcache/cache.go +++ b/internal/streamcache/cache.go @@ -149,7 +149,12 @@ func New(cfg config.StreamCacheConfig, logger logrus.FieldLogger) Cache { strconv.Itoa(int(cfg.MaxAge.Duration().Seconds())), ).Set(1) - return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.After, time.After, logger) + maxAge := cfg.MaxAge.Duration() + return &minOccurrences{ + N: cfg.MinOccurrences, + MinAge: maxAge, + Cache: newCacheWithSleep(cfg.Dir, maxAge, time.After, time.After, logger), + } } return NullCache{} diff --git a/internal/streamcache/cache_test.go b/internal/streamcache/cache_test.go index 8d0d0dc82..9e375b1d5 100644 --- a/internal/streamcache/cache_test.go +++ b/internal/streamcache/cache_test.go @@ -30,6 +30,21 @@ func newCache(dir string) Cache { }, log.Default()) } +func innerCache(c Cache) *cache { + for { + switch v := c.(type) { + case *cache: + return v + case *minOccurrences: + c = v.Cache + case *TestLoggingCache: + c = v.Cache + default: + panic(fmt.Errorf("unexpected cache type: %v", v)) + } + } +} + func TestCache_writeOneReadMultiple(t *testing.T) { ctx := testhelper.Context(t) @@ -127,7 +142,7 @@ func requireCacheFiles(t *testing.T, dir string, n int) { func requireCacheEntries(t *testing.T, _c Cache, n int) { t.Helper() - c := _c.(*cache) + c := innerCache(_c) c.m.Lock() defer c.m.Unlock() require.Len(t, c.index, n) @@ -332,7 +347,7 @@ func TestCache_failCreateFile(t *testing.T) { defer c.Stop() createError := errors.New("cannot create file") - c.(*cache).createFile = func() (namedWriteCloser, error) { return nil, createError } + innerCache(c).createFile = func() (namedWriteCloser, error) { return nil, createError } _, _, err := c.Fetch(ctx, "key", io.Discard, func(io.Writer) error { return nil }) require.Equal(t, createError, err) @@ -346,7 +361,7 @@ func TestCache_unWriteableFile(t *testing.T) { c := newCache(tmp) defer c.Stop() - c.(*cache).createFile = func() (namedWriteCloser, error) { + innerCache(c).createFile = func() (namedWriteCloser, error) { return os.OpenFile(filepath.Join(tmp, "unwriteable"), os.O_RDONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile) } @@ -366,7 +381,7 @@ func TestCache_unCloseableFile(t *testing.T) { c := newCache(tmp) defer c.Stop() - c.(*cache).createFile = func() (namedWriteCloser, error) { + innerCache(c).createFile = func() (namedWriteCloser, error) { f, err := os.OpenFile(filepath.Join(tmp, "uncloseable"), os.O_WRONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile) if err != nil { return nil, err @@ -387,7 +402,7 @@ func TestCache_cannotOpenFileForReading(t *testing.T) { c := newCache(tmp) defer c.Stop() - c.(*cache).createFile = func() (namedWriteCloser, error) { + innerCache(c).createFile = func() (namedWriteCloser, error) { f, err := os.OpenFile(filepath.Join(tmp, "unopenable"), os.O_WRONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile) if err != nil { return nil, err -- cgit v1.2.3