Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-03-20 13:56:39 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-03-20 13:56:39 +0300
commit1017fbf2f98b23dd93d86173015159fd31df64d0 (patch)
treed0c80889f8fd686b3a60b909db93d7e1c6a14695
parentf6142399768f6eb6eae011373f9bcfef07ab8119 (diff)
parentc48293dcac05a1b3ec83f9b1fbb00a1ca792ab68 (diff)
Merge branch 'jv-cache-suppressor' into 'master'wc/testhelper-dial-timeout
Add MinOccurrences cache middleware See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5501 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Jacob Vosmaer <jacob@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Justin Tobler <jtobler@gitlab.com> Co-authored-by: Jacob Vosmaer <jacob@gitlab.com>
-rw-r--r--internal/gitaly/config/config.go13
-rw-r--r--internal/streamcache/cache.go7
-rw-r--r--internal/streamcache/cache_test.go25
-rw-r--r--internal/streamcache/min_occurences.go65
-rw-r--r--internal/streamcache/min_occurrences_test.go140
5 files changed, 241 insertions, 9 deletions
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go
index 222081d01..db46d4b32 100644
--- a/internal/gitaly/config/config.go
+++ b/internal/gitaly/config/config.go
@@ -9,6 +9,7 @@ import (
"path/filepath"
"reflect"
"regexp"
+ "strconv"
"strings"
"syscall"
"time"
@@ -262,9 +263,10 @@ type PackObjectsLimiting struct {
// StreamCacheConfig contains settings for a streamcache instance.
type StreamCacheConfig struct {
- Enabled bool `toml:"enabled"` // Default: false
- Dir string `toml:"dir"` // Default: <FIRST STORAGE PATH>/+gitaly/PackObjectsCache
- MaxAge duration.Duration `toml:"max_age"` // Default: 5m
+ Enabled bool `toml:"enabled"` // Default: false
+ Dir string `toml:"dir"` // Default: <FIRST STORAGE PATH>/+gitaly/PackObjectsCache
+ MaxAge duration.Duration `toml:"max_age"` // Default: 5m
+ MinOccurrences int `toml:"min_occurrences"`
}
// Load initializes the Config variable from file and the environment.
@@ -614,6 +616,11 @@ func (cfg *Cfg) configurePackObjectsCache() error {
return errPackObjectsCacheRelativePath
}
+ // Temporary environment variable for validation on GitLab.com
+ if n, err := strconv.Atoi(os.Getenv("GITALY_PACK_OBJECTS_CACHE_MIN_OCCURRENCES")); err == nil && n > 0 {
+ poc.MinOccurrences = n
+ }
+
return nil
}
diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go
index 99459c806..326d7ebcb 100644
--- a/internal/streamcache/cache.go
+++ b/internal/streamcache/cache.go
@@ -149,7 +149,12 @@ func New(cfg config.StreamCacheConfig, logger logrus.FieldLogger) Cache {
strconv.Itoa(int(cfg.MaxAge.Duration().Seconds())),
).Set(1)
- return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.After, time.After, logger)
+ maxAge := cfg.MaxAge.Duration()
+ return &minOccurrences{
+ N: cfg.MinOccurrences,
+ MinAge: maxAge,
+ Cache: newCacheWithSleep(cfg.Dir, maxAge, time.After, time.After, logger),
+ }
}
return NullCache{}
diff --git a/internal/streamcache/cache_test.go b/internal/streamcache/cache_test.go
index 8d0d0dc82..9e375b1d5 100644
--- a/internal/streamcache/cache_test.go
+++ b/internal/streamcache/cache_test.go
@@ -30,6 +30,21 @@ func newCache(dir string) Cache {
}, log.Default())
}
+func innerCache(c Cache) *cache {
+ for {
+ switch v := c.(type) {
+ case *cache:
+ return v
+ case *minOccurrences:
+ c = v.Cache
+ case *TestLoggingCache:
+ c = v.Cache
+ default:
+ panic(fmt.Errorf("unexpected cache type: %v", v))
+ }
+ }
+}
+
func TestCache_writeOneReadMultiple(t *testing.T) {
ctx := testhelper.Context(t)
@@ -127,7 +142,7 @@ func requireCacheFiles(t *testing.T, dir string, n int) {
func requireCacheEntries(t *testing.T, _c Cache, n int) {
t.Helper()
- c := _c.(*cache)
+ c := innerCache(_c)
c.m.Lock()
defer c.m.Unlock()
require.Len(t, c.index, n)
@@ -332,7 +347,7 @@ func TestCache_failCreateFile(t *testing.T) {
defer c.Stop()
createError := errors.New("cannot create file")
- c.(*cache).createFile = func() (namedWriteCloser, error) { return nil, createError }
+ innerCache(c).createFile = func() (namedWriteCloser, error) { return nil, createError }
_, _, err := c.Fetch(ctx, "key", io.Discard, func(io.Writer) error { return nil })
require.Equal(t, createError, err)
@@ -346,7 +361,7 @@ func TestCache_unWriteableFile(t *testing.T) {
c := newCache(tmp)
defer c.Stop()
- c.(*cache).createFile = func() (namedWriteCloser, error) {
+ innerCache(c).createFile = func() (namedWriteCloser, error) {
return os.OpenFile(filepath.Join(tmp, "unwriteable"), os.O_RDONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile)
}
@@ -366,7 +381,7 @@ func TestCache_unCloseableFile(t *testing.T) {
c := newCache(tmp)
defer c.Stop()
- c.(*cache).createFile = func() (namedWriteCloser, error) {
+ innerCache(c).createFile = func() (namedWriteCloser, error) {
f, err := os.OpenFile(filepath.Join(tmp, "uncloseable"), os.O_WRONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile)
if err != nil {
return nil, err
@@ -387,7 +402,7 @@ func TestCache_cannotOpenFileForReading(t *testing.T) {
c := newCache(tmp)
defer c.Stop()
- c.(*cache).createFile = func() (namedWriteCloser, error) {
+ innerCache(c).createFile = func() (namedWriteCloser, error) {
f, err := os.OpenFile(filepath.Join(tmp, "unopenable"), os.O_WRONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile)
if err != nil {
return nil, err
diff --git a/internal/streamcache/min_occurences.go b/internal/streamcache/min_occurences.go
new file mode 100644
index 000000000..d48e37e95
--- /dev/null
+++ b/internal/streamcache/min_occurences.go
@@ -0,0 +1,65 @@
+package streamcache
+
+import (
+ "context"
+ "io"
+ "sync"
+ "time"
+)
+
+// minOccurrences is a streamcache middleware. Its intended use is to
+// prevent storing unique keys in the cache. If a key is retrieved only
+// once, the cache is not helping, and it costs us disk IO to store an
+// entry in the cache. By keeping a counter per key minOccurrences can
+// prevent keys from reaching the "real" cache until their count is high
+// enough.
+type minOccurrences struct {
+ Cache
+ N int // Minimum occurrences before a key passes to real Cache
+ MinAge time.Duration // Minimum time to remember a key
+
+ // We must garbage-collect our counters periodically to prevent unbounded
+ // memory growth. To efficiently drop many counters we drop an entire
+ // map. To prevent resetting all counters and creating a wave of cache
+ // misses, use an "old" and "new" generation of counters. Frequently used
+ // keys will migrate from "old" to "new" and thereby will not get reset.
+ m sync.Mutex
+ oldCount map[string]int
+ newCount map[string]int
+ rotatedAt time.Time
+
+ null NullCache
+}
+
+func (mo *minOccurrences) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) {
+ if mo.incr(key) > mo.N {
+ return mo.Cache.Fetch(ctx, key, dst, create)
+ }
+ return mo.null.Fetch(ctx, key, dst, create)
+}
+
+func (mo *minOccurrences) incr(key string) int {
+ mo.m.Lock()
+ defer mo.m.Unlock()
+
+ // Keys live longer than MinAge and that is OK. We just need to make sure
+ // they get garbage-collected eventually, and that they live at least as
+ // long as the underlying cache expiry time.
+ if now := time.Now(); now.Sub(mo.rotatedAt) > mo.MinAge {
+ mo.rotatedAt = now
+ mo.oldCount = mo.newCount // This causes mo.oldCount to get garbage-collected
+ mo.newCount = nil
+ }
+
+ if mo.newCount == nil {
+ mo.newCount = make(map[string]int, len(mo.oldCount))
+ }
+
+ mo.newCount[key]++
+ if old := mo.oldCount[key]; old > 0 {
+ mo.newCount[key] += old
+ delete(mo.oldCount, key)
+ }
+
+ return mo.newCount[key]
+}
diff --git a/internal/streamcache/min_occurrences_test.go b/internal/streamcache/min_occurrences_test.go
new file mode 100644
index 000000000..fc09aa3c7
--- /dev/null
+++ b/internal/streamcache/min_occurrences_test.go
@@ -0,0 +1,140 @@
+package streamcache
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+)
+
+func testNullCache() *TestLoggingCache { return &TestLoggingCache{Cache: NullCache{}} }
+
+func TestMinOccurrences_suppress(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ tc := testNullCache()
+ c := &minOccurrences{
+ N: 1,
+ MinAge: 5 * time.Minute,
+ Cache: tc,
+ }
+ defer c.Stop()
+
+ for i := 0; i < 10; i++ {
+ output := &bytes.Buffer{}
+ _, _, err := c.Fetch(ctx, "key", output, writeString("hello"))
+ require.NoError(t, err)
+ require.Equal(t, "hello", output.String())
+
+ if i < c.N {
+ require.Empty(t, tc.Entries(), "expect first call not to have reached underlying cache")
+ } else {
+ require.Len(t, tc.Entries(), 1+i-c.N, "call should reach underlying cache")
+ }
+ }
+}
+
+func TestMinOccurrences_manyDifferentCalls(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ tc := testNullCache()
+ c := &minOccurrences{
+ N: 1,
+ MinAge: 5 * time.Minute,
+ Cache: tc,
+ }
+ defer c.Stop()
+
+ const calls = 1000
+ fetchErrors := make(chan error, calls)
+ start := make(chan struct{})
+ for i := 0; i < calls; i++ {
+ go func(i int) {
+ <-start
+ _, _, err := c.Fetch(ctx, fmt.Sprintf("key.%d", i), io.Discard, writeString(fmt.Sprintf("hello.%d", i)))
+ fetchErrors <- err
+ }(i)
+ }
+
+ close(start)
+ for i := 0; i < calls; i++ {
+ require.NoError(t, <-fetchErrors)
+ }
+
+ require.Empty(t, tc.Entries(), "because every key was unique, no calls should have reached underlying cache")
+}
+
+type stopCache struct {
+ NullCache
+ stopped int
+}
+
+func (sc *stopCache) Stop() { sc.stopped++ }
+
+func TestMinOccurrences_propagateStop(t *testing.T) {
+ sc := &stopCache{}
+ c := minOccurrences{Cache: sc}
+ require.Equal(t, 0, sc.stopped)
+
+ c.Stop()
+ require.Equal(t, 1, sc.stopped)
+}
+
+func (mo *minOccurrences) size() int {
+ mo.m.Lock()
+ defer mo.m.Unlock()
+ return len(mo.oldCount) + len(mo.newCount)
+}
+
+func TestMinOccurrences_forgetOldKeys(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ tc := testNullCache()
+ c := &minOccurrences{
+ N: 1,
+ MinAge: time.Hour,
+ Cache: tc,
+ }
+ defer c.Stop()
+
+ require.Equal(t, 0, c.size())
+
+ const calls = 1000
+ for i := 0; i < calls; i++ {
+ _, _, err := c.Fetch(ctx, fmt.Sprintf("old key.%d", i), io.Discard, writeString("hello"))
+ require.NoError(t, err)
+ }
+ require.Equal(t, calls, c.size(), "old keys")
+
+ c.rotatedAt = time.Now().Add(-2 * c.MinAge)
+ _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello"))
+ require.Equal(t, calls+1, c.size(), "old keys and new key")
+
+ c.rotatedAt = time.Now().Add(-2 * c.MinAge)
+ _, _, _ = c.Fetch(ctx, "new key", io.Discard, writeString("hello"))
+ require.Equal(t, 1, c.size(), "only new key")
+}
+
+func TestMinOccurrences_keySurvivesRotation(t *testing.T) {
+ tc := testNullCache()
+ c := &minOccurrences{
+ N: 1,
+ MinAge: time.Hour,
+ Cache: tc,
+ }
+ defer c.Stop()
+
+ for i := 0; i < 1000; i++ {
+ c.rotatedAt = time.Now().Add(-2 * c.MinAge)
+ require.Equal(t, i+1, c.incr("frequent key"), "frequently occurring key is remembered and incremented")
+
+ // Use i%3 because it takes 2 rotations for a key to be forgotten
+ if i%3 == 0 {
+ require.Equal(t, 1, c.incr("infrequent key"), "infrequent key is forgotten and reset")
+ }
+ }
+}