diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2023-03-14 14:19:45 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2023-03-20 12:19:54 +0300 |
commit | df4204b3f33e1e9bde89852ff338edec3848ed32 (patch) | |
tree | 030986d72a73f532dca75cb26284ceb75c49a56e | |
parent | 220959f0ffc3d01fa448cc2c7b45b082d56690ef (diff) |
Add MinOccurrences cache middleware
This adds a new streamcache middleware called MinOccurrences. Its
purpose is to only forward requests to the inner cache for keys that
have been seen a minimum number of times.
-rw-r--r-- | internal/streamcache/min_occurences.go | 65 | ||||
-rw-r--r-- | internal/streamcache/min_occurrences_test.go | 140 |
2 files changed, 205 insertions, 0 deletions
diff --git a/internal/streamcache/min_occurences.go b/internal/streamcache/min_occurences.go new file mode 100644 index 000000000..d48e37e95 --- /dev/null +++ b/internal/streamcache/min_occurences.go @@ -0,0 +1,65 @@ +package streamcache + +import ( + "context" + "io" + "sync" + "time" +) + +// minOccurrences is a streamcache middleware. Its intended use is to +// prevent storing unique keys in the cache. If a key is retrieved only +// once, the cache is not helping, and it costs us disk IO to store an +// entry in the cache. By keeping a counter per key minOccurrences can +// prevent keys from reaching the "real" cache until their count is high +// enough. +type minOccurrences struct { + Cache + N int // Minimum occurrences before a key passes to real Cache + MinAge time.Duration // Minimum time to remember a key + + // We must garbage-collect our counters periodically to prevent unbounded + // memory growth. To efficiently drop many counters we drop an entire + // map. To prevent resetting all counters and creating a wave of cache + // misses, use an "old" and "new" generation of counters. Frequently used + // keys will migrate from "old" to "new" and thereby will not get reset. + m sync.Mutex + oldCount map[string]int + newCount map[string]int + rotatedAt time.Time + + null NullCache +} + +func (mo *minOccurrences) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) { + if mo.incr(key) > mo.N { + return mo.Cache.Fetch(ctx, key, dst, create) + } + return mo.null.Fetch(ctx, key, dst, create) +} + +func (mo *minOccurrences) incr(key string) int { + mo.m.Lock() + defer mo.m.Unlock() + + // Keys live longer than MinAge and that is OK. We just need to make sure + // they get garbage-collected eventually, and that they live at least as + // long as the underlying cache expiry time. + if now := time.Now(); now.Sub(mo.rotatedAt) > mo.MinAge { + mo.rotatedAt = now + mo.oldCount = mo.newCount // This causes mo.oldCount to get garbage-collected + mo.newCount = nil + } + + if mo.newCount == nil { + mo.newCount = make(map[string]int, len(mo.oldCount)) + } + + mo.newCount[key]++ + if old := mo.oldCount[key]; old > 0 { + mo.newCount[key] += old + delete(mo.oldCount, key) + } + + return mo.newCount[key] +} diff --git a/internal/streamcache/min_occurrences_test.go b/internal/streamcache/min_occurrences_test.go new file mode 100644 index 000000000..fc09aa3c7 --- /dev/null +++ b/internal/streamcache/min_occurrences_test.go @@ -0,0 +1,140 @@ +package streamcache + +import ( + "bytes" + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func testNullCache() *TestLoggingCache { return &TestLoggingCache{Cache: NullCache{}} } + +func TestMinOccurrences_suppress(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: 5 * time.Minute, + Cache: tc, + } + defer c.Stop() + + for i := 0; i < 10; i++ { + output := &bytes.Buffer{} + _, _, err := c.Fetch(ctx, "key", output, writeString("hello")) + require.NoError(t, err) + require.Equal(t, "hello", output.String()) + + if i < c.N { + require.Empty(t, tc.Entries(), "expect first call not to have reached underlying cache") + } else { + require.Len(t, tc.Entries(), 1+i-c.N, "call should reach underlying cache") + } + } +} + +func TestMinOccurrences_manyDifferentCalls(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: 5 * time.Minute, + Cache: tc, + } + defer c.Stop() + + const calls = 1000 + fetchErrors := make(chan error, calls) + start := make(chan struct{}) + for i := 0; i < calls; i++ { + go func(i int) { + <-start + _, _, err := c.Fetch(ctx, fmt.Sprintf("key.%d", i), io.Discard, writeString(fmt.Sprintf("hello.%d", i))) + fetchErrors <- err + }(i) + } + + close(start) + for i := 0; i < calls; i++ { + require.NoError(t, <-fetchErrors) + } + + require.Empty(t, tc.Entries(), "because every key was unique, no calls should have reached underlying cache") +} + +type stopCache struct { + NullCache + stopped int +} + +func (sc *stopCache) Stop() { sc.stopped++ } + +func TestMinOccurrences_propagateStop(t *testing.T) { + sc := &stopCache{} + c := minOccurrences{Cache: sc} + require.Equal(t, 0, sc.stopped) + + c.Stop() + require.Equal(t, 1, sc.stopped) +} + +func (mo *minOccurrences) size() int { + mo.m.Lock() + defer mo.m.Unlock() + return len(mo.oldCount) + len(mo.newCount) +} + +func TestMinOccurrences_forgetOldKeys(t *testing.T) { + ctx := testhelper.Context(t) + + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: time.Hour, + Cache: tc, + } + defer c.Stop() + + require.Equal(t, 0, c.size()) + + const calls = 1000 + for i := 0; i < calls; i++ { + _, _, err := c.Fetch(ctx, fmt.Sprintf("old key.%d", i), io.Discard, writeString("hello")) + require.NoError(t, err) + } + require.Equal(t, calls, c.size(), "old keys") + + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello")) + require.Equal(t, calls+1, c.size(), "old keys and new key") + + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello")) + require.Equal(t, 1, c.size(), "only new key") +} + +func TestMinOccurrences_keySurvivesRotation(t *testing.T) { + tc := testNullCache() + c := &minOccurrences{ + N: 1, + MinAge: time.Hour, + Cache: tc, + } + defer c.Stop() + + for i := 0; i < 1000; i++ { + c.rotatedAt = time.Now().Add(-2 * c.MinAge) + require.Equal(t, i+1, c.incr("frequent key"), "frequently occurring key is remembered and incremented") + + // Use i%3 because it takes 2 rotations for a key to be forgotten + if i%3 == 0 { + require.Equal(t, 1, c.incr("infrequent key"), "infrequent key is forgotten and reset") + } + } +} |