Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-03-14 08:32:28 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-03-14 08:32:28 +0300
commit220959f0ffc3d01fa448cc2c7b45b082d56690ef (patch)
tree83d80401941536ea1f39b40a2ec2d250271a5c61 /internal/streamcache
parent906fdaa2fd6f4dc4baec1b36b9341f4157e2042a (diff)
parentde53fe2f5c773a059b599674ccaf459b5daf4b49 (diff)
Merge branch 'jv-streamcache-interface' into 'master'
Simplify streamcache.Cache for efficient null implementation See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5444 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: John Cai <jcai@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Co-authored-by: Jacob Vosmaer <jacob@gitlab.com>
Diffstat (limited to 'internal/streamcache')
-rw-r--r--internal/streamcache/cache.go97
-rw-r--r--internal/streamcache/cache_test.go162
2 files changed, 102 insertions, 157 deletions
diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go
index 857140220..99459c806 100644
--- a/internal/streamcache/cache.go
+++ b/internal/streamcache/cache.go
@@ -38,6 +38,7 @@ import (
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v15/internal/dontpanic"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper"
)
var (
@@ -60,10 +61,12 @@ var (
// Cache is a cache for large byte streams.
type Cache interface {
- // FindOrCreate finds or creates a cache entry. If the create callback
- // runs, it will be asynchronous and created is set to true. Callers must
- // Close() the returned stream to free underlying resources.
- FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error)
+ // Fetch finds or creates a cache entry and writes its contents into dst.
+ // If the create callback is called the created return value is true. In
+ // case of a non-nil error return, the create callback may still be
+ // running in a goroutine for the benefit of another caller of Fetch with
+ // the same key.
+ Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error)
// Stop stops the cleanup goroutines of the cache.
Stop()
}
@@ -86,15 +89,15 @@ type TestLoggingCache struct {
m sync.Mutex
}
-// FindOrCreate calls the underlying FindOrCreate method and logs the
+// Fetch calls the underlying Fetch method and logs the
// result.
-func (tlc *TestLoggingCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) {
- s, created, err = tlc.Cache.FindOrCreate(key, create)
+func (tlc *TestLoggingCache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) {
+ written, created, err = tlc.Cache.Fetch(ctx, key, dst, create)
tlc.m.Lock()
defer tlc.m.Unlock()
tlc.entries = append(tlc.entries, &TestLogEntry{Key: key, Created: created, Err: err})
- return s, created, err
+ return written, created, err
}
// Entries returns a reference to the log of entries observed so far.
@@ -112,13 +115,11 @@ var _ = Cache(NullCache{})
// and it uses no storage.
type NullCache struct{}
-// FindOrCreate runs create in a goroutine and lets the caller consume
-// the result via the returned stream. The created flag is always true.
-func (NullCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) {
- pr, pw := io.Pipe()
- w := newWaiter()
- go func() { w.SetError(runCreate(pw, create)) }()
- return &Stream{ReadCloser: pr, waiter: w}, true, nil
+// Fetch runs create(dst). The created flag is always true.
+func (NullCache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) {
+ w := &helper.CountingWriter{W: dst}
+ err = create(w)
+ return w.N, true, err
}
// Stop is a no-op.
@@ -227,13 +228,33 @@ func (c *cache) setIndexSize() {
cacheIndexSize.WithLabelValues(c.dir).Set(float64(len(c.index)))
}
-func (c *cache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) {
+func (c *cache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) {
+ var (
+ rc io.ReadCloser
+ wt *waiter
+ )
+ rc, wt, created, err = c.getStream(key, create)
+ if err != nil {
+ return
+ }
+ defer rc.Close()
+
+ written, err = io.Copy(dst, rc)
+ if err != nil {
+ return
+ }
+
+ err = wt.Wait(ctx)
+ return
+}
+
+func (c *cache) getStream(key string, create func(io.Writer) error) (_ io.ReadCloser, _ *waiter, created bool, err error) {
c.m.Lock()
defer c.m.Unlock()
if e := c.index[key]; e != nil {
- if s, err := e.Open(); err == nil {
- return s, false, nil
+ if r, err := e.pipe.OpenReader(); err == nil {
+ return r, e.waiter, false, nil
}
// In this case err != nil. That is allowed to happen, for instance if
@@ -243,15 +264,15 @@ func (c *cache) FindOrCreate(key string, create func(io.Writer) error) (s *Strea
c.delete(key)
}
- s, e, err := c.newEntry(key, create)
+ r, e, err := c.newEntry(key, create)
if err != nil {
- return nil, false, err
+ return nil, nil, false, err
}
c.index[key] = e
c.setIndexSize()
- return s, true, nil
+ return r, e.waiter, true, nil
}
type entry struct {
@@ -262,28 +283,7 @@ type entry struct {
waiter *waiter
}
-// Stream abstracts a stream of bytes (via Read()) plus an error (via
-// Wait()). Callers must always call Close() to prevent resource leaks.
-type Stream struct {
- waiter *waiter
- io.ReadCloser
-}
-
-// Wait returns the error value of the Stream. If ctx is canceled,
-// Wait unblocks and returns early.
-func (s *Stream) Wait(ctx context.Context) error { return s.waiter.Wait(ctx) }
-
-// WriteTo implements io.WriterTo. For some w on some platforms, this
-// uses sendfile to make copying data more efficient.
-func (s *Stream) WriteTo(w io.Writer) (int64, error) {
- if wt, ok := s.ReadCloser.(io.WriterTo); ok {
- return wt.WriteTo(w)
- }
-
- return io.Copy(w, s.ReadCloser)
-}
-
-func (c *cache) newEntry(key string, create func(io.Writer) error) (_ *Stream, _ *entry, err error) {
+func (c *cache) newEntry(key string, create func(io.Writer) error) (_ io.ReadCloser, _ *entry, err error) {
e := &entry{
key: key,
cache: c,
@@ -333,11 +333,7 @@ func (c *cache) newEntry(key string, create func(io.Writer) error) (_ *Stream, _
}
}()
- return e.wrapReadCloser(pr), e, nil
-}
-
-func (e *entry) wrapReadCloser(r io.ReadCloser) *Stream {
- return &Stream{ReadCloser: r, waiter: e.waiter}
+ return pr, e, nil
}
func runCreate(w io.WriteCloser, create func(io.Writer) error) (err error) {
@@ -363,11 +359,6 @@ func runCreate(w io.WriteCloser, create func(io.Writer) error) (err error) {
return nil
}
-func (e *entry) Open() (*Stream, error) {
- r, err := e.pipe.OpenReader()
- return e.wrapReadCloser(r), err
-}
-
type waiter struct {
done chan struct{}
err error
diff --git a/internal/streamcache/cache_test.go b/internal/streamcache/cache_test.go
index 9a2bb3603..8d0d0dc82 100644
--- a/internal/streamcache/cache_test.go
+++ b/internal/streamcache/cache_test.go
@@ -46,16 +46,11 @@ func TestCache_writeOneReadMultiple(t *testing.T) {
for i := 0; i < N; i++ {
t.Run(fmt.Sprintf("read %d", i), func(t *testing.T) {
- r, created, err := c.FindOrCreate(key, writeString(content(i)))
+ buf := &bytes.Buffer{}
+ _, created, err := c.Fetch(ctx, key, buf, writeString(content(i)))
require.NoError(t, err)
- defer r.Close()
-
require.Equal(t, i == 0, created, "all calls except the first one should be cache hits")
-
- out, err := io.ReadAll(r)
- require.NoError(t, err)
- require.NoError(t, r.Wait(ctx))
- require.Equal(t, content(0), string(out), "expect cache hits for all i > 0")
+ require.Equal(t, content(0), buf.String(), "expect cache hits for all i > 0")
})
}
@@ -88,19 +83,14 @@ func TestCache_manyConcurrentWrites(t *testing.T) {
errors <- func() error {
<-start
- r, _, err := c.FindOrCreate(key, writeString(content[i]))
- if err != nil {
- return err
- }
- defer r.Close()
-
- out, err := io.ReadAll(r)
+ buf := &bytes.Buffer{}
+ _, _, err := c.Fetch(ctx, key, buf, writeString(content[i]))
if err != nil {
return err
}
- output[i] = string(out)
- return r.Wait(ctx)
+ output[i] = buf.String()
+ return nil
}()
}(i)
}
@@ -113,10 +103,10 @@ func TestCache_manyConcurrentWrites(t *testing.T) {
}
for i := 0; i < N; i++ {
- require.Equal(t, output[0], output[i], "all calls to FindOrCreate returned the same bytes")
+ require.Equal(t, output[0], output[i], "all calls to Fetch returned the same bytes")
}
- require.Contains(t, content, output[0], "data returned by FindOrCreate is not mangled")
+ require.Contains(t, content, output[0], "data returned by Fetch is not mangled")
requireCacheFiles(t, tmp, 1)
}
@@ -145,6 +135,7 @@ func requireCacheEntries(t *testing.T, _c Cache, n int) {
func TestCache_deletedFile(t *testing.T) {
tmp := testhelper.TempDir(t)
+ ctx := testhelper.Context(t)
c := newCache(tmp)
defer c.Stop()
@@ -154,9 +145,9 @@ func TestCache_deletedFile(t *testing.T) {
)
content := func(i int) string { return fmt.Sprintf("content %d", i) }
- r1, created, err := c.FindOrCreate(key, writeString(content(1)))
+ buf1 := &bytes.Buffer{}
+ _, created, err := c.Fetch(ctx, key, buf1, writeString(content(1)))
require.NoError(t, err)
- defer r1.Close()
require.True(t, created)
require.NoError(t, os.RemoveAll(tmp), "wipe out underlying files of cache")
@@ -166,18 +157,13 @@ func TestCache_deletedFile(t *testing.T) {
requireCacheFiles(t, tmp, 0)
requireCacheEntries(t, c, 1)
- r2, created, err := c.FindOrCreate(key, writeString(content(2)))
+ buf2 := &bytes.Buffer{}
+ _, created, err = c.Fetch(ctx, key, buf2, writeString(content(2)))
require.NoError(t, err)
- defer r2.Close()
require.True(t, created, "because the first file is gone, cache is forced to create a new entry")
- out1, err := io.ReadAll(r1)
- require.NoError(t, err)
- require.Equal(t, content(1), string(out1), "r1 should still see its original pre-wipe contents")
-
- out2, err := io.ReadAll(r2)
- require.NoError(t, err)
- require.Equal(t, content(2), string(out2), "r2 should see the new post-wipe contents")
+ require.Equal(t, content(1), buf1.String(), "r1 should still see its original pre-wipe contents")
+ require.Equal(t, content(2), buf2.String(), "r2 should see the new post-wipe contents")
}
func TestCache_scope(t *testing.T) {
@@ -194,38 +180,38 @@ func TestCache_scope(t *testing.T) {
// to test that they do not trample on each others files.
cache := make([]Cache, N)
input := make([]string, N)
- reader := make([]*Stream, N)
- var err error
+ output := make([]string, N)
+ wg := &sync.WaitGroup{}
+ wg.Add(N)
for i := 0; i < N; i++ {
- input[i] = fmt.Sprintf("test content %d", i)
- cache[i] = newCache(tmp)
- defer func(i int) { cache[i].Stop() }(i)
-
- var created bool
- reader[i], created, err = cache[i].FindOrCreate(key, writeString(input[i]))
- require.NoError(t, err)
- defer func(i int) { require.NoError(t, reader[i].Close()) }(i)
- require.True(t, created)
+ go func(i int) {
+ defer wg.Done()
+
+ input[i] = fmt.Sprintf("test content %d", i)
+ cache[i] = newCache(tmp)
+ defer func(i int) { cache[i].Stop() }(i)
+
+ buf := &bytes.Buffer{}
+ _, created, err := cache[i].Fetch(ctx, key, buf, writeString(input[i]))
+ require.NoError(t, err)
+ require.True(t, created)
+ output[i] = buf.String()
+ }(i)
}
+ wg.Wait()
// If different cache instances overwrite their entries, the effect may
// be order dependent, e.g. "last write wins". We could reverse the order
// now to catch that possible bug, but then we only test for one kind of
// bug. Let's shuffle instead, which can catch more hypothetical bugs.
rand.Shuffle(N, func(i, j int) {
- reader[i], reader[j] = reader[j], reader[i]
+ output[i], output[j] = output[j], output[i]
input[i], input[j] = input[j], input[i]
})
for i := 0; i < N; i++ {
- r, content := reader[i], input[i]
-
- out, err := io.ReadAll(r)
- require.NoError(t, err)
- require.NoError(t, r.Wait(ctx))
-
- require.Equal(t, content, string(out))
+ require.Equal(t, input[i], output[i])
}
}
@@ -256,15 +242,11 @@ func TestCache_diskCleanup(t *testing.T) {
content := func(i int) string { return fmt.Sprintf("content %d", i) }
- r1, created, err := c.FindOrCreate(key, writeString(content(1)))
+ out1 := &bytes.Buffer{}
+ _, created, err := c.Fetch(ctx, key, out1, writeString(content(1)))
require.NoError(t, err)
- defer r1.Close()
require.True(t, created)
-
- out1, err := io.ReadAll(r1)
- require.NoError(t, err)
- require.Equal(t, content(1), string(out1))
- require.NoError(t, r1.Wait(ctx))
+ require.Equal(t, content(1), out1.String())
// File and index entry should still exist because cleanup goroutines are blocked.
requireCacheFiles(t, tmp, 1)
@@ -295,17 +277,11 @@ func TestCache_diskCleanup(t *testing.T) {
requireCacheFiles(t, tmp, 0)
requireCacheEntries(t, c, 0)
- r2, created, err := c.FindOrCreate(key, writeString(content(2)))
+ out2 := &bytes.Buffer{}
+ _, created, err = c.Fetch(ctx, key, out2, writeString(content(2)))
require.NoError(t, err)
- defer r2.Close()
require.True(t, created)
-
- out2, err := io.ReadAll(r2)
- require.NoError(t, err)
- require.NoError(t, r2.Wait(ctx))
-
- // Sanity check: no stale value returned by the cache
- require.Equal(t, content(2), string(out2))
+ require.Equal(t, content(2), out2.String(), "Sanity check: no stale value returned by the cache")
}
func TestCache_failedWrite(t *testing.T) {
@@ -332,30 +308,24 @@ func TestCache_failedWrite(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- r1, created, err := c.FindOrCreate(tc.desc, tc.create)
- require.NoError(t, err)
+ _, created, err := c.Fetch(ctx, tc.desc, io.Discard, tc.create)
+ require.Error(t, err)
require.True(t, created)
- _, err = io.Copy(io.Discard, r1)
- require.NoError(t, err, "errors on the write end are not propagated via Read()")
- require.NoError(t, r1.Close(), "errors on the write end are not propagated via Close()")
- require.Error(t, r1.Wait(ctx), "error propagation happens via Wait()")
-
const happy = "all is good"
- r2, created, err := c.FindOrCreate(tc.desc, writeString(happy))
+ buf := &bytes.Buffer{}
+ _, created, err = c.Fetch(ctx, tc.desc, buf, writeString(happy))
require.NoError(t, err)
- defer r2.Close()
require.True(t, created, "because the previous entry failed, a new one should have been created")
- out, err := io.ReadAll(r2)
- require.NoError(t, err)
- require.NoError(t, r2.Wait(ctx))
- require.Equal(t, happy, string(out))
+ require.Equal(t, happy, buf.String())
})
}
}
func TestCache_failCreateFile(t *testing.T) {
+ ctx := testhelper.Context(t)
+
tmp := testhelper.TempDir(t)
c := newCache(tmp)
@@ -364,7 +334,7 @@ func TestCache_failCreateFile(t *testing.T) {
createError := errors.New("cannot create file")
c.(*cache).createFile = func() (namedWriteCloser, error) { return nil, createError }
- _, _, err := c.FindOrCreate("key", func(io.Writer) error { return nil })
+ _, _, err := c.Fetch(ctx, "key", io.Discard, func(io.Writer) error { return nil })
require.Equal(t, createError, err)
}
@@ -380,17 +350,10 @@ func TestCache_unWriteableFile(t *testing.T) {
return os.OpenFile(filepath.Join(tmp, "unwriteable"), os.O_RDONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile)
}
- r, created, err := c.FindOrCreate("key", func(w io.Writer) error {
+ _, _, err := c.Fetch(ctx, "key", io.Discard, func(w io.Writer) error {
_, err := io.WriteString(w, "hello")
return err
})
- require.NoError(t, err)
- require.True(t, created)
-
- _, err = io.ReadAll(r)
- require.NoError(t, err)
-
- err = r.Wait(ctx)
require.IsType(t, &os.PathError{}, err)
require.Equal(t, "write", err.(*os.PathError).Op)
}
@@ -411,19 +374,14 @@ func TestCache_unCloseableFile(t *testing.T) {
return f, f.Close() // Already closed so cannot be closed again
}
- r, created, err := c.FindOrCreate("key", func(w io.Writer) error { return nil })
- require.NoError(t, err)
- require.True(t, created)
-
- _, err = io.ReadAll(r)
- require.NoError(t, err)
-
- err = r.Wait(ctx)
+ _, _, err := c.Fetch(ctx, "key", io.Discard, func(w io.Writer) error { return nil })
require.IsType(t, &os.PathError{}, err)
require.Equal(t, "close", err.(*os.PathError).Op)
}
func TestCache_cannotOpenFileForReading(t *testing.T) {
+ ctx := testhelper.Context(t)
+
tmp := testhelper.TempDir(t)
c := newCache(tmp)
@@ -437,7 +395,7 @@ func TestCache_cannotOpenFileForReading(t *testing.T) {
return f, os.Remove(f.Name()) // Removed so cannot be opened
}
- _, _, err := c.FindOrCreate("key", func(w io.Writer) error { return nil })
+ _, _, err := c.Fetch(ctx, "key", io.Discard, func(w io.Writer) error { return nil })
err = errors.Unwrap(err)
require.IsType(t, &os.PathError{}, err)
require.Equal(t, "open", err.(*os.PathError).Op)
@@ -490,7 +448,8 @@ func TestNullCache(t *testing.T) {
<-start
- s, created, err := c.FindOrCreate(key, func(w io.Writer) error {
+ output := &bytes.Buffer{}
+ _, created, err := c.Fetch(ctx, key, output, func(w io.Writer) error {
for j := 0; j < len(input); j++ {
n, err := w.Write(input[j : j+1])
if err != nil {
@@ -505,21 +464,16 @@ func TestNullCache(t *testing.T) {
if err != nil {
return err
}
- defer s.Close()
if !created {
return errors.New("created should be true")
}
- output, err := io.ReadAll(s)
- if err != nil {
- return err
- }
- if !bytes.Equal(output, input) {
+ if !bytes.Equal(output.Bytes(), input) {
return errors.New("output does not match input")
}
- return s.Wait(ctx)
+ return nil
}()
}()
}