diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-20 12:18:58 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-10-06 13:39:32 +0300 |
commit | c22be4a9b15afb58879b23e334ef3698c236f7f0 (patch) | |
tree | 587cc1f239e830c1b60a347e014eb762d22de0ce | |
parent | 2ff20d772203555ce7e44e3b37db7f3e9803d988 (diff) |
catfile: Provide cacheable object readers
Provide cacheable object readers such that callers do not have to always
create two separate git-cat-file(1) processes even though they only need
the functionality to read objects.
-rw-r--r-- | internal/git/catfile/batch.go | 2 | ||||
-rw-r--r-- | internal/git/catfile/cache.go | 27 | ||||
-rw-r--r-- | internal/git/catfile/cache_test.go | 136 | ||||
-rw-r--r-- | internal/git/catfile/object_reader.go | 24 | ||||
-rw-r--r-- | internal/git/catfile/object_reader_test.go | 18 |
5 files changed, 196 insertions, 11 deletions
diff --git a/internal/git/catfile/batch.go b/internal/git/catfile/batch.go index 221277c87..eba11feae 100644 --- a/internal/git/catfile/batch.go +++ b/internal/git/catfile/batch.go @@ -127,7 +127,7 @@ func (c *batch) Tag(ctx context.Context, revision git.Revision) (*Object, error) } func (c *batch) typedObjectReader(ctx context.Context, revision git.Revision, expectedType string) (*Object, error) { - object, err := c.objectReader.reader(ctx, revision) + object, err := c.objectReader.Object(ctx, revision) if err != nil { return nil, err } diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index ac63f8434..66389feff 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -31,6 +31,9 @@ type Cache interface { // BatchProcess either creates a new git-cat-file(1) process or returns a cached one for // the given repository. BatchProcess(context.Context, git.RepositoryExecutor) (Batch, error) + // ObjectReader either creates a new object reader or returns a cached one for the given + // repository. + ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, error) // Evict evicts all cached processes from the cache. Evict() } @@ -53,6 +56,7 @@ type ProcessCache struct { monitorDone chan interface{} batchProcesses stack + objectReaders stack catfileCacheCounter *prometheus.CounterVec currentCatfileProcesses prometheus.Gauge @@ -81,6 +85,9 @@ func newCache(ttl time.Duration, maxLen int, refreshInterval time.Duration) *Pro batchProcesses: stack{ maxLen: maxLen, }, + objectReaders: stack{ + maxLen: maxLen, + }, catfileCacheCounter: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitaly_catfile_cache_total", @@ -141,6 +148,7 @@ func (c *ProcessCache) monitor() { select { case <-c.monitorTicker.C: c.batchProcesses.EnforceTTL(time.Now()) + c.objectReaders.EnforceTTL(time.Now()) case <-c.monitorDone: close(c.monitorDone) return @@ -176,6 +184,23 @@ func (c *ProcessCache) BatchProcess(ctx context.Context, repo git.RepositoryExec return batch, nil } +// ObjectReader creates a new ObjectReader process for the given repository. +func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, error) { + cacheable, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { + return newObjectReader(ctx, repo, c.catfileLookupCounter) + }) + if err != nil { + return nil, err + } + + objectReader, ok := cacheable.(ObjectReader) + if !ok { + return nil, fmt.Errorf("expected object reader, got %T", objectReader) + } + + return objectReader, nil +} + func (c *ProcessCache) getOrCreateProcess( ctx context.Context, repo repository.GitRepo, @@ -254,11 +279,13 @@ func (c *ProcessCache) getOrCreateProcess( func (c *ProcessCache) reportCacheMembers() { c.catfileCacheMembers.WithLabelValues("batch").Set(float64(c.batchProcesses.EntryCount())) + c.catfileCacheMembers.WithLabelValues("object_reader").Set(float64(c.objectReaders.EntryCount())) } // Evict evicts all cached processes from the cache. func (c *ProcessCache) Evict() { c.batchProcesses.Evict() + c.objectReaders.Evict() } func (c *ProcessCache) returnWhenDone(done <-chan struct{}, s *stack, cacheKey key, value cacheable, cancel func()) { diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go index 82c96b532..0cb105ebf 100644 --- a/internal/git/catfile/cache_test.go +++ b/internal/git/catfile/cache_test.go @@ -340,6 +340,142 @@ func TestCache_BatchProcess(t *testing.T) { }) } +func TestCache_ObjectReader(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) + repoExecutor := newRepoExecutor(t, cfg, repo) + + cache := newCache(time.Hour, 10, time.Hour) + defer cache.Stop() + cache.cachedProcessDone = sync.NewCond(&sync.Mutex{}) + + t.Run("uncancellable", func(t *testing.T) { + ctx := context.Background() + + require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() { + _, _ = cache.ObjectReader(ctx, repoExecutor) + }) + }) + + t.Run("uncacheable", func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + // The context doesn't carry a session ID and is thus uncacheable. + // The process should never get returned to the cache and must be + // killed on context cancellation. + reader, err := cache.ObjectReader(ctx, repoExecutor) + require.NoError(t, err) + + objectReaderImpl, ok := reader.(*objectReader) + require.True(t, ok, "expected object reader") + + cancel() + + // We're cheating a bit here to avoid creating a racy test by reaching into the + // process and trying to read from its stdout. If the cancel did kill the process as + // expected, then the stdout should be closed and we'll get an EOF. + output, err := io.ReadAll(objectReaderImpl.stdout) + if err != nil { + require.True(t, errors.Is(err, os.ErrClosed)) + } else { + require.NoError(t, err) + } + require.Empty(t, output) + + require.True(t, reader.isClosed()) + require.Empty(t, keys(t, &cache.objectReaders)) + }) + + t.Run("cacheable", func(t *testing.T) { + defer cache.Evict() + + ctx, cancel := testhelper.Context() + defer cancel() + ctx = correlation.ContextWithCorrelation(ctx, "1") + ctx = testhelper.MergeIncomingMetadata(ctx, + metadata.Pairs(SessionIDField, "1"), + ) + + reader, err := cache.ObjectReader(ctx, repoExecutor) + require.NoError(t, err) + + // Cancel the context such that the process will be considered for return to the + // cache and wait for the cache to collect it. + cache.cachedProcessDone.L.Lock() + cancel() + defer cache.cachedProcessDone.L.Unlock() + cache.cachedProcessDone.Wait() + + keys := keys(t, &cache.objectReaders) + require.Equal(t, []key{{ + sessionID: "1", + repoStorage: repo.GetStorageName(), + repoRelPath: repo.GetRelativePath(), + }}, keys) + + // Assert that we can still read from the cached process. + _, err = reader.Object(ctx, "refs/heads/master") + require.NoError(t, err) + }) + + t.Run("dirty process does not get cached", func(t *testing.T) { + defer cache.Evict() + + ctx, cancel := testhelper.Context() + defer cancel() + ctx = testhelper.MergeIncomingMetadata(ctx, + metadata.Pairs(SessionIDField, "1"), + ) + + reader, err := cache.ObjectReader(ctx, repoExecutor) + require.NoError(t, err) + + // While we request object data, we do not consume it at all. The reader is thus + // dirty and cannot be reused and shouldn't be returned to the cache. + object, err := reader.Object(ctx, "refs/heads/master") + require.NoError(t, err) + + // Cancel the context such that the process will be considered for return to the + // cache and wait for the cache to collect it. + cache.cachedProcessDone.L.Lock() + cancel() + defer cache.cachedProcessDone.L.Unlock() + cache.cachedProcessDone.Wait() + + require.Empty(t, keys(t, &cache.objectReaders)) + + // The process should be killed now, so reading the object must fail. + _, err = io.ReadAll(object) + require.True(t, errors.Is(err, os.ErrClosed)) + }) + + t.Run("closed process does not get cached", func(t *testing.T) { + defer cache.Evict() + + ctx, cancel := testhelper.Context() + defer cancel() + ctx = testhelper.MergeIncomingMetadata(ctx, + metadata.Pairs(SessionIDField, "1"), + ) + + reader, err := cache.ObjectReader(ctx, repoExecutor) + require.NoError(t, err) + + // Closed processes naturally cannot be reused anymore and thus shouldn't ever get + // cached. + reader.close() + + // Cancel the context such that the process will be considered for return to the + // cache and wait for the cache to collect it. + cache.cachedProcessDone.L.Lock() + cancel() + defer cache.cachedProcessDone.L.Unlock() + cache.cachedProcessDone.Wait() + + require.Empty(t, keys(t, &cache.objectReaders)) + }) +} + func requireStackValid(t *testing.T, s *stack) { s.entriesMutex.Lock() defer s.entriesMutex.Unlock() diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 024b423c4..a87a118b1 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -21,6 +21,15 @@ type Object struct { io.Reader } +// ObjectReader is a reader for Git objects. +type ObjectReader interface { + cacheable + + // Reader returns a new Object for the given revision. The Object must be fully consumed + // before another object is requested. + Object(_ context.Context, _ git.Revision) (*Object, error) +} + // objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file // --batch` process such that we do not have to spawn a new process for each object we are about to // read. @@ -41,6 +50,8 @@ type objectReader struct { // unpredictable way. sync.Mutex + closed bool + // creationCtx is the context in which this reader has been created. This context may // potentially be decorrelated from the "real" RPC context in case the reader is going to be // cached. @@ -84,10 +95,21 @@ func newObjectReader( } func (o *objectReader) close() { + o.Lock() + defer o.Unlock() + _ = o.cmd.Wait() + + o.closed = true +} + +func (o *objectReader) isClosed() bool { + o.Lock() + defer o.Unlock() + return o.closed } -func (o *objectReader) reader( +func (o *objectReader) Object( ctx context.Context, revision git.Revision, ) (*Object, error) { diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go index bc092aa0d..aad733f78 100644 --- a/internal/git/catfile/object_reader_test.go +++ b/internal/git/catfile/object_reader_test.go @@ -29,7 +29,7 @@ func TestObjectReader_reader(t *testing.T) { reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) - object, err := reader.reader(ctx, "refs/heads/master") + object, err := reader.Object(ctx, "refs/heads/master") require.NoError(t, err) data, err := io.ReadAll(object) @@ -41,7 +41,7 @@ func TestObjectReader_reader(t *testing.T) { reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) - object, err := reader.reader(ctx, commitID.Revision()) + object, err := reader.Object(ctx, commitID.Revision()) require.NoError(t, err) data, err := io.ReadAll(object) @@ -54,11 +54,11 @@ func TestObjectReader_reader(t *testing.T) { reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) - _, err = reader.reader(ctx, "refs/heads/does-not-exist") + _, err = reader.Object(ctx, "refs/heads/does-not-exist") require.EqualError(t, err, "object not found") // Verify that we're still able to read a commit after the previous read has failed. - object, err := reader.reader(ctx, commitID.Revision()) + object, err := reader.Object(ctx, commitID.Revision()) require.NoError(t, err) data, err := io.ReadAll(object) @@ -71,11 +71,11 @@ func TestObjectReader_reader(t *testing.T) { reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) - _, err = reader.reader(ctx, commitID.Revision()) + _, err = reader.Object(ctx, commitID.Revision()) require.NoError(t, err) // We haven't yet consumed the previous object, so this must now fail. - _, err = reader.reader(ctx, commitID.Revision()) + _, err = reader.Object(ctx, commitID.Revision()) require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1)) }) @@ -83,14 +83,14 @@ func TestObjectReader_reader(t *testing.T) { reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) - object, err := reader.reader(ctx, commitID.Revision()) + object, err := reader.Object(ctx, commitID.Revision()) require.NoError(t, err) _, err = io.CopyN(io.Discard, object, 100) require.NoError(t, err) // We haven't yet consumed the previous object, so this must now fail. - _, err = reader.reader(ctx, commitID.Revision()) + _, err = reader.Object(ctx, commitID.Revision()) require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1)) }) @@ -108,7 +108,7 @@ func TestObjectReader_reader(t *testing.T) { } { require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues(objectType))) - object, err := reader.reader(ctx, revision) + object, err := reader.Object(ctx, revision) require.NoError(t, err) require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues(objectType))) |