Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2023-03-14 14:19:45 +0300
committerJacob Vosmaer <jacob@gitlab.com>2023-03-20 12:19:54 +0300
commitdf4204b3f33e1e9bde89852ff338edec3848ed32 (patch)
tree030986d72a73f532dca75cb26284ceb75c49a56e
parent220959f0ffc3d01fa448cc2c7b45b082d56690ef (diff)
Add MinOccurrences cache middleware
This adds a new streamcache middleware called MinOccurrences. Its purpose is to only forward requests to the inner cache for keys that have been seen a minimum number of times.
-rw-r--r--internal/streamcache/min_occurences.go65
-rw-r--r--internal/streamcache/min_occurrences_test.go140
2 files changed, 205 insertions, 0 deletions
diff --git a/internal/streamcache/min_occurences.go b/internal/streamcache/min_occurences.go
new file mode 100644
index 000000000..d48e37e95
--- /dev/null
+++ b/internal/streamcache/min_occurences.go
@@ -0,0 +1,65 @@
+package streamcache
+
+import (
+ "context"
+ "io"
+ "sync"
+ "time"
+)
+
+// minOccurrences is a streamcache middleware. Its intended use is to
+// prevent storing unique keys in the cache. If a key is retrieved only
+// once, the cache is not helping, and it costs us disk IO to store an
+// entry in the cache. By keeping a counter per key minOccurrences can
+// prevent keys from reaching the "real" cache until their count is high
+// enough.
+type minOccurrences struct {
+ Cache
+ N int // Minimum occurrences before a key passes to real Cache
+ MinAge time.Duration // Minimum time to remember a key
+
+ // We must garbage-collect our counters periodically to prevent unbounded
+ // memory growth. To efficiently drop many counters we drop an entire
+ // map. To prevent resetting all counters and creating a wave of cache
+ // misses, use an "old" and "new" generation of counters. Frequently used
+ // keys will migrate from "old" to "new" and thereby will not get reset.
+ m sync.Mutex
+ oldCount map[string]int
+ newCount map[string]int
+ rotatedAt time.Time
+
+ null NullCache
+}
+
+func (mo *minOccurrences) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) {
+ if mo.incr(key) > mo.N {
+ return mo.Cache.Fetch(ctx, key, dst, create)
+ }
+ return mo.null.Fetch(ctx, key, dst, create)
+}
+
+func (mo *minOccurrences) incr(key string) int {
+ mo.m.Lock()
+ defer mo.m.Unlock()
+
+ // Keys live longer than MinAge and that is OK. We just need to make sure
+ // they get garbage-collected eventually, and that they live at least as
+ // long as the underlying cache expiry time.
+ if now := time.Now(); now.Sub(mo.rotatedAt) > mo.MinAge {
+ mo.rotatedAt = now
+ mo.oldCount = mo.newCount // This causes mo.oldCount to get garbage-collected
+ mo.newCount = nil
+ }
+
+ if mo.newCount == nil {
+ mo.newCount = make(map[string]int, len(mo.oldCount))
+ }
+
+ mo.newCount[key]++
+ if old := mo.oldCount[key]; old > 0 {
+ mo.newCount[key] += old
+ delete(mo.oldCount, key)
+ }
+
+ return mo.newCount[key]
+}
diff --git a/internal/streamcache/min_occurrences_test.go b/internal/streamcache/min_occurrences_test.go
new file mode 100644
index 000000000..fc09aa3c7
--- /dev/null
+++ b/internal/streamcache/min_occurrences_test.go
@@ -0,0 +1,140 @@
+package streamcache
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+)
+
+func testNullCache() *TestLoggingCache { return &TestLoggingCache{Cache: NullCache{}} }
+
+func TestMinOccurrences_suppress(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ tc := testNullCache()
+ c := &minOccurrences{
+ N: 1,
+ MinAge: 5 * time.Minute,
+ Cache: tc,
+ }
+ defer c.Stop()
+
+ for i := 0; i < 10; i++ {
+ output := &bytes.Buffer{}
+ _, _, err := c.Fetch(ctx, "key", output, writeString("hello"))
+ require.NoError(t, err)
+ require.Equal(t, "hello", output.String())
+
+ if i < c.N {
+ require.Empty(t, tc.Entries(), "expect first call not to have reached underlying cache")
+ } else {
+ require.Len(t, tc.Entries(), 1+i-c.N, "call should reach underlying cache")
+ }
+ }
+}
+
+func TestMinOccurrences_manyDifferentCalls(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ tc := testNullCache()
+ c := &minOccurrences{
+ N: 1,
+ MinAge: 5 * time.Minute,
+ Cache: tc,
+ }
+ defer c.Stop()
+
+ const calls = 1000
+ fetchErrors := make(chan error, calls)
+ start := make(chan struct{})
+ for i := 0; i < calls; i++ {
+ go func(i int) {
+ <-start
+ _, _, err := c.Fetch(ctx, fmt.Sprintf("key.%d", i), io.Discard, writeString(fmt.Sprintf("hello.%d", i)))
+ fetchErrors <- err
+ }(i)
+ }
+
+ close(start)
+ for i := 0; i < calls; i++ {
+ require.NoError(t, <-fetchErrors)
+ }
+
+ require.Empty(t, tc.Entries(), "because every key was unique, no calls should have reached underlying cache")
+}
+
+type stopCache struct {
+ NullCache
+ stopped int
+}
+
+func (sc *stopCache) Stop() { sc.stopped++ }
+
+func TestMinOccurrences_propagateStop(t *testing.T) {
+ sc := &stopCache{}
+ c := minOccurrences{Cache: sc}
+ require.Equal(t, 0, sc.stopped)
+
+ c.Stop()
+ require.Equal(t, 1, sc.stopped)
+}
+
+func (mo *minOccurrences) size() int {
+ mo.m.Lock()
+ defer mo.m.Unlock()
+ return len(mo.oldCount) + len(mo.newCount)
+}
+
+func TestMinOccurrences_forgetOldKeys(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ tc := testNullCache()
+ c := &minOccurrences{
+ N: 1,
+ MinAge: time.Hour,
+ Cache: tc,
+ }
+ defer c.Stop()
+
+ require.Equal(t, 0, c.size())
+
+ const calls = 1000
+ for i := 0; i < calls; i++ {
+ _, _, err := c.Fetch(ctx, fmt.Sprintf("old key.%d", i), io.Discard, writeString("hello"))
+ require.NoError(t, err)
+ }
+ require.Equal(t, calls, c.size(), "old keys")
+
+ c.rotatedAt = time.Now().Add(-2 * c.MinAge)
+ _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello"))
+ require.Equal(t, calls+1, c.size(), "old keys and new key")
+
+ c.rotatedAt = time.Now().Add(-2 * c.MinAge)
+ _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello"))
+ require.Equal(t, 1, c.size(), "only new key")
+}
+
+func TestMinOccurrences_keySurvivesRotation(t *testing.T) {
+ tc := testNullCache()
+ c := &minOccurrences{
+ N: 1,
+ MinAge: time.Hour,
+ Cache: tc,
+ }
+ defer c.Stop()
+
+ for i := 0; i < 1000; i++ {
+ c.rotatedAt = time.Now().Add(-2 * c.MinAge)
+ require.Equal(t, i+1, c.incr("frequent key"), "frequently occurring key is remembered and incremented")
+
+ // Use i%3 because it takes 2 rotations for a key to be forgotten
+ if i%3 == 0 {
+ require.Equal(t, 1, c.incr("infrequent key"), "infrequent key is forgotten and reset")
+ }
+ }
+}