diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-03-14 08:32:28 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-03-14 08:32:28 +0300 |
commit | 220959f0ffc3d01fa448cc2c7b45b082d56690ef (patch) | |
tree | 83d80401941536ea1f39b40a2ec2d250271a5c61 /internal/streamcache | |
parent | 906fdaa2fd6f4dc4baec1b36b9341f4157e2042a (diff) | |
parent | de53fe2f5c773a059b599674ccaf459b5daf4b49 (diff) |
Merge branch 'jv-streamcache-interface' into 'master'
Simplify streamcache.Cache for efficient null implementation
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5444
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: John Cai <jcai@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Co-authored-by: Jacob Vosmaer <jacob@gitlab.com>
Diffstat (limited to 'internal/streamcache')
-rw-r--r-- | internal/streamcache/cache.go | 97 | ||||
-rw-r--r-- | internal/streamcache/cache_test.go | 162 |
2 files changed, 102 insertions, 157 deletions
diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go index 857140220..99459c806 100644 --- a/internal/streamcache/cache.go +++ b/internal/streamcache/cache.go @@ -38,6 +38,7 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v15/internal/dontpanic" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper" ) var ( @@ -60,10 +61,12 @@ var ( // Cache is a cache for large byte streams. type Cache interface { - // FindOrCreate finds or creates a cache entry. If the create callback - // runs, it will be asynchronous and created is set to true. Callers must - // Close() the returned stream to free underlying resources. - FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) + // Fetch finds or creates a cache entry and writes its contents into dst. + // If the create callback is called the created return value is true. In + // case of a non-nil error return, the create callback may still be + // running in a goroutine for the benefit of another caller of Fetch with + // the same key. + Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) // Stop stops the cleanup goroutines of the cache. Stop() } @@ -86,15 +89,15 @@ type TestLoggingCache struct { m sync.Mutex } -// FindOrCreate calls the underlying FindOrCreate method and logs the +// Fetch calls the underlying Fetch method and logs the // result. -func (tlc *TestLoggingCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) { - s, created, err = tlc.Cache.FindOrCreate(key, create) +func (tlc *TestLoggingCache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) { + written, created, err = tlc.Cache.Fetch(ctx, key, dst, create) tlc.m.Lock() defer tlc.m.Unlock() tlc.entries = append(tlc.entries, &TestLogEntry{Key: key, Created: created, Err: err}) - return s, created, err + return written, created, err } // Entries returns a reference to the log of entries observed so far. @@ -112,13 +115,11 @@ var _ = Cache(NullCache{}) // and it uses no storage. type NullCache struct{} -// FindOrCreate runs create in a goroutine and lets the caller consume -// the result via the returned stream. The created flag is always true. -func (NullCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) { - pr, pw := io.Pipe() - w := newWaiter() - go func() { w.SetError(runCreate(pw, create)) }() - return &Stream{ReadCloser: pr, waiter: w}, true, nil +// Fetch runs create(dst). The created flag is always true. +func (NullCache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) { + w := &helper.CountingWriter{W: dst} + err = create(w) + return w.N, true, err } // Stop is a no-op. @@ -227,13 +228,33 @@ func (c *cache) setIndexSize() { cacheIndexSize.WithLabelValues(c.dir).Set(float64(len(c.index))) } -func (c *cache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) { +func (c *cache) Fetch(ctx context.Context, key string, dst io.Writer, create func(io.Writer) error) (written int64, created bool, err error) { + var ( + rc io.ReadCloser + wt *waiter + ) + rc, wt, created, err = c.getStream(key, create) + if err != nil { + return + } + defer rc.Close() + + written, err = io.Copy(dst, rc) + if err != nil { + return + } + + err = wt.Wait(ctx) + return +} + +func (c *cache) getStream(key string, create func(io.Writer) error) (_ io.ReadCloser, _ *waiter, created bool, err error) { c.m.Lock() defer c.m.Unlock() if e := c.index[key]; e != nil { - if s, err := e.Open(); err == nil { - return s, false, nil + if r, err := e.pipe.OpenReader(); err == nil { + return r, e.waiter, false, nil } // In this case err != nil. That is allowed to happen, for instance if @@ -243,15 +264,15 @@ func (c *cache) FindOrCreate(key string, create func(io.Writer) error) (s *Strea c.delete(key) } - s, e, err := c.newEntry(key, create) + r, e, err := c.newEntry(key, create) if err != nil { - return nil, false, err + return nil, nil, false, err } c.index[key] = e c.setIndexSize() - return s, true, nil + return r, e.waiter, true, nil } type entry struct { @@ -262,28 +283,7 @@ type entry struct { waiter *waiter } -// Stream abstracts a stream of bytes (via Read()) plus an error (via -// Wait()). Callers must always call Close() to prevent resource leaks. -type Stream struct { - waiter *waiter - io.ReadCloser -} - -// Wait returns the error value of the Stream. If ctx is canceled, -// Wait unblocks and returns early. -func (s *Stream) Wait(ctx context.Context) error { return s.waiter.Wait(ctx) } - -// WriteTo implements io.WriterTo. For some w on some platforms, this -// uses sendfile to make copying data more efficient. -func (s *Stream) WriteTo(w io.Writer) (int64, error) { - if wt, ok := s.ReadCloser.(io.WriterTo); ok { - return wt.WriteTo(w) - } - - return io.Copy(w, s.ReadCloser) -} - -func (c *cache) newEntry(key string, create func(io.Writer) error) (_ *Stream, _ *entry, err error) { +func (c *cache) newEntry(key string, create func(io.Writer) error) (_ io.ReadCloser, _ *entry, err error) { e := &entry{ key: key, cache: c, @@ -333,11 +333,7 @@ func (c *cache) newEntry(key string, create func(io.Writer) error) (_ *Stream, _ } }() - return e.wrapReadCloser(pr), e, nil -} - -func (e *entry) wrapReadCloser(r io.ReadCloser) *Stream { - return &Stream{ReadCloser: r, waiter: e.waiter} + return pr, e, nil } func runCreate(w io.WriteCloser, create func(io.Writer) error) (err error) { @@ -363,11 +359,6 @@ func runCreate(w io.WriteCloser, create func(io.Writer) error) (err error) { return nil } -func (e *entry) Open() (*Stream, error) { - r, err := e.pipe.OpenReader() - return e.wrapReadCloser(r), err -} - type waiter struct { done chan struct{} err error diff --git a/internal/streamcache/cache_test.go b/internal/streamcache/cache_test.go index 9a2bb3603..8d0d0dc82 100644 --- a/internal/streamcache/cache_test.go +++ b/internal/streamcache/cache_test.go @@ -46,16 +46,11 @@ func TestCache_writeOneReadMultiple(t *testing.T) { for i := 0; i < N; i++ { t.Run(fmt.Sprintf("read %d", i), func(t *testing.T) { - r, created, err := c.FindOrCreate(key, writeString(content(i))) + buf := &bytes.Buffer{} + _, created, err := c.Fetch(ctx, key, buf, writeString(content(i))) require.NoError(t, err) - defer r.Close() - require.Equal(t, i == 0, created, "all calls except the first one should be cache hits") - - out, err := io.ReadAll(r) - require.NoError(t, err) - require.NoError(t, r.Wait(ctx)) - require.Equal(t, content(0), string(out), "expect cache hits for all i > 0") + require.Equal(t, content(0), buf.String(), "expect cache hits for all i > 0") }) } @@ -88,19 +83,14 @@ func TestCache_manyConcurrentWrites(t *testing.T) { errors <- func() error { <-start - r, _, err := c.FindOrCreate(key, writeString(content[i])) - if err != nil { - return err - } - defer r.Close() - - out, err := io.ReadAll(r) + buf := &bytes.Buffer{} + _, _, err := c.Fetch(ctx, key, buf, writeString(content[i])) if err != nil { return err } - output[i] = string(out) - return r.Wait(ctx) + output[i] = buf.String() + return nil }() }(i) } @@ -113,10 +103,10 @@ func TestCache_manyConcurrentWrites(t *testing.T) { } for i := 0; i < N; i++ { - require.Equal(t, output[0], output[i], "all calls to FindOrCreate returned the same bytes") + require.Equal(t, output[0], output[i], "all calls to Fetch returned the same bytes") } - require.Contains(t, content, output[0], "data returned by FindOrCreate is not mangled") + require.Contains(t, content, output[0], "data returned by Fetch is not mangled") requireCacheFiles(t, tmp, 1) } @@ -145,6 +135,7 @@ func requireCacheEntries(t *testing.T, _c Cache, n int) { func TestCache_deletedFile(t *testing.T) { tmp := testhelper.TempDir(t) + ctx := testhelper.Context(t) c := newCache(tmp) defer c.Stop() @@ -154,9 +145,9 @@ func TestCache_deletedFile(t *testing.T) { ) content := func(i int) string { return fmt.Sprintf("content %d", i) } - r1, created, err := c.FindOrCreate(key, writeString(content(1))) + buf1 := &bytes.Buffer{} + _, created, err := c.Fetch(ctx, key, buf1, writeString(content(1))) require.NoError(t, err) - defer r1.Close() require.True(t, created) require.NoError(t, os.RemoveAll(tmp), "wipe out underlying files of cache") @@ -166,18 +157,13 @@ func TestCache_deletedFile(t *testing.T) { requireCacheFiles(t, tmp, 0) requireCacheEntries(t, c, 1) - r2, created, err := c.FindOrCreate(key, writeString(content(2))) + buf2 := &bytes.Buffer{} + _, created, err = c.Fetch(ctx, key, buf2, writeString(content(2))) require.NoError(t, err) - defer r2.Close() require.True(t, created, "because the first file is gone, cache is forced to create a new entry") - out1, err := io.ReadAll(r1) - require.NoError(t, err) - require.Equal(t, content(1), string(out1), "r1 should still see its original pre-wipe contents") - - out2, err := io.ReadAll(r2) - require.NoError(t, err) - require.Equal(t, content(2), string(out2), "r2 should see the new post-wipe contents") + require.Equal(t, content(1), buf1.String(), "r1 should still see its original pre-wipe contents") + require.Equal(t, content(2), buf2.String(), "r2 should see the new post-wipe contents") } func TestCache_scope(t *testing.T) { @@ -194,38 +180,38 @@ func TestCache_scope(t *testing.T) { // to test that they do not trample on each others files. cache := make([]Cache, N) input := make([]string, N) - reader := make([]*Stream, N) - var err error + output := make([]string, N) + wg := &sync.WaitGroup{} + wg.Add(N) for i := 0; i < N; i++ { - input[i] = fmt.Sprintf("test content %d", i) - cache[i] = newCache(tmp) - defer func(i int) { cache[i].Stop() }(i) - - var created bool - reader[i], created, err = cache[i].FindOrCreate(key, writeString(input[i])) - require.NoError(t, err) - defer func(i int) { require.NoError(t, reader[i].Close()) }(i) - require.True(t, created) + go func(i int) { + defer wg.Done() + + input[i] = fmt.Sprintf("test content %d", i) + cache[i] = newCache(tmp) + defer func(i int) { cache[i].Stop() }(i) + + buf := &bytes.Buffer{} + _, created, err := cache[i].Fetch(ctx, key, buf, writeString(input[i])) + require.NoError(t, err) + require.True(t, created) + output[i] = buf.String() + }(i) } + wg.Wait() // If different cache instances overwrite their entries, the effect may // be order dependent, e.g. "last write wins". We could reverse the order // now to catch that possible bug, but then we only test for one kind of // bug. Let's shuffle instead, which can catch more hypothetical bugs. rand.Shuffle(N, func(i, j int) { - reader[i], reader[j] = reader[j], reader[i] + output[i], output[j] = output[j], output[i] input[i], input[j] = input[j], input[i] }) for i := 0; i < N; i++ { - r, content := reader[i], input[i] - - out, err := io.ReadAll(r) - require.NoError(t, err) - require.NoError(t, r.Wait(ctx)) - - require.Equal(t, content, string(out)) + require.Equal(t, input[i], output[i]) } } @@ -256,15 +242,11 @@ func TestCache_diskCleanup(t *testing.T) { content := func(i int) string { return fmt.Sprintf("content %d", i) } - r1, created, err := c.FindOrCreate(key, writeString(content(1))) + out1 := &bytes.Buffer{} + _, created, err := c.Fetch(ctx, key, out1, writeString(content(1))) require.NoError(t, err) - defer r1.Close() require.True(t, created) - - out1, err := io.ReadAll(r1) - require.NoError(t, err) - require.Equal(t, content(1), string(out1)) - require.NoError(t, r1.Wait(ctx)) + require.Equal(t, content(1), out1.String()) // File and index entry should still exist because cleanup goroutines are blocked. requireCacheFiles(t, tmp, 1) @@ -295,17 +277,11 @@ func TestCache_diskCleanup(t *testing.T) { requireCacheFiles(t, tmp, 0) requireCacheEntries(t, c, 0) - r2, created, err := c.FindOrCreate(key, writeString(content(2))) + out2 := &bytes.Buffer{} + _, created, err = c.Fetch(ctx, key, out2, writeString(content(2))) require.NoError(t, err) - defer r2.Close() require.True(t, created) - - out2, err := io.ReadAll(r2) - require.NoError(t, err) - require.NoError(t, r2.Wait(ctx)) - - // Sanity check: no stale value returned by the cache - require.Equal(t, content(2), string(out2)) + require.Equal(t, content(2), out2.String(), "Sanity check: no stale value returned by the cache") } func TestCache_failedWrite(t *testing.T) { @@ -332,30 +308,24 @@ func TestCache_failedWrite(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - r1, created, err := c.FindOrCreate(tc.desc, tc.create) - require.NoError(t, err) + _, created, err := c.Fetch(ctx, tc.desc, io.Discard, tc.create) + require.Error(t, err) require.True(t, created) - _, err = io.Copy(io.Discard, r1) - require.NoError(t, err, "errors on the write end are not propagated via Read()") - require.NoError(t, r1.Close(), "errors on the write end are not propagated via Close()") - require.Error(t, r1.Wait(ctx), "error propagation happens via Wait()") - const happy = "all is good" - r2, created, err := c.FindOrCreate(tc.desc, writeString(happy)) + buf := &bytes.Buffer{} + _, created, err = c.Fetch(ctx, tc.desc, buf, writeString(happy)) require.NoError(t, err) - defer r2.Close() require.True(t, created, "because the previous entry failed, a new one should have been created") - out, err := io.ReadAll(r2) - require.NoError(t, err) - require.NoError(t, r2.Wait(ctx)) - require.Equal(t, happy, string(out)) + require.Equal(t, happy, buf.String()) }) } } func TestCache_failCreateFile(t *testing.T) { + ctx := testhelper.Context(t) + tmp := testhelper.TempDir(t) c := newCache(tmp) @@ -364,7 +334,7 @@ func TestCache_failCreateFile(t *testing.T) { createError := errors.New("cannot create file") c.(*cache).createFile = func() (namedWriteCloser, error) { return nil, createError } - _, _, err := c.FindOrCreate("key", func(io.Writer) error { return nil }) + _, _, err := c.Fetch(ctx, "key", io.Discard, func(io.Writer) error { return nil }) require.Equal(t, createError, err) } @@ -380,17 +350,10 @@ func TestCache_unWriteableFile(t *testing.T) { return os.OpenFile(filepath.Join(tmp, "unwriteable"), os.O_RDONLY|os.O_CREATE|os.O_EXCL, perm.SharedFile) } - r, created, err := c.FindOrCreate("key", func(w io.Writer) error { + _, _, err := c.Fetch(ctx, "key", io.Discard, func(w io.Writer) error { _, err := io.WriteString(w, "hello") return err }) - require.NoError(t, err) - require.True(t, created) - - _, err = io.ReadAll(r) - require.NoError(t, err) - - err = r.Wait(ctx) require.IsType(t, &os.PathError{}, err) require.Equal(t, "write", err.(*os.PathError).Op) } @@ -411,19 +374,14 @@ func TestCache_unCloseableFile(t *testing.T) { return f, f.Close() // Already closed so cannot be closed again } - r, created, err := c.FindOrCreate("key", func(w io.Writer) error { return nil }) - require.NoError(t, err) - require.True(t, created) - - _, err = io.ReadAll(r) - require.NoError(t, err) - - err = r.Wait(ctx) + _, _, err := c.Fetch(ctx, "key", io.Discard, func(w io.Writer) error { return nil }) require.IsType(t, &os.PathError{}, err) require.Equal(t, "close", err.(*os.PathError).Op) } func TestCache_cannotOpenFileForReading(t *testing.T) { + ctx := testhelper.Context(t) + tmp := testhelper.TempDir(t) c := newCache(tmp) @@ -437,7 +395,7 @@ func TestCache_cannotOpenFileForReading(t *testing.T) { return f, os.Remove(f.Name()) // Removed so cannot be opened } - _, _, err := c.FindOrCreate("key", func(w io.Writer) error { return nil }) + _, _, err := c.Fetch(ctx, "key", io.Discard, func(w io.Writer) error { return nil }) err = errors.Unwrap(err) require.IsType(t, &os.PathError{}, err) require.Equal(t, "open", err.(*os.PathError).Op) @@ -490,7 +448,8 @@ func TestNullCache(t *testing.T) { <-start - s, created, err := c.FindOrCreate(key, func(w io.Writer) error { + output := &bytes.Buffer{} + _, created, err := c.Fetch(ctx, key, output, func(w io.Writer) error { for j := 0; j < len(input); j++ { n, err := w.Write(input[j : j+1]) if err != nil { @@ -505,21 +464,16 @@ func TestNullCache(t *testing.T) { if err != nil { return err } - defer s.Close() if !created { return errors.New("created should be true") } - output, err := io.ReadAll(s) - if err != nil { - return err - } - if !bytes.Equal(output, input) { + if !bytes.Equal(output.Bytes(), input) { return errors.New("output does not match input") } - return s.Wait(ctx) + return nil }() }() } |