Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-20 12:18:58 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-10-06 13:39:32 +0300
commitc22be4a9b15afb58879b23e334ef3698c236f7f0 (patch)
tree587cc1f239e830c1b60a347e014eb762d22de0ce
parent2ff20d772203555ce7e44e3b37db7f3e9803d988 (diff)
catfile: Provide cacheable object readers
Provide cacheable object readers such that callers do not have to always create two separate git-cat-file(1) processes even though they only need the functionality to read objects.
-rw-r--r--internal/git/catfile/batch.go2
-rw-r--r--internal/git/catfile/cache.go27
-rw-r--r--internal/git/catfile/cache_test.go136
-rw-r--r--internal/git/catfile/object_reader.go24
-rw-r--r--internal/git/catfile/object_reader_test.go18
5 files changed, 196 insertions, 11 deletions
diff --git a/internal/git/catfile/batch.go b/internal/git/catfile/batch.go
index 221277c87..eba11feae 100644
--- a/internal/git/catfile/batch.go
+++ b/internal/git/catfile/batch.go
@@ -127,7 +127,7 @@ func (c *batch) Tag(ctx context.Context, revision git.Revision) (*Object, error)
}
func (c *batch) typedObjectReader(ctx context.Context, revision git.Revision, expectedType string) (*Object, error) {
- object, err := c.objectReader.reader(ctx, revision)
+ object, err := c.objectReader.Object(ctx, revision)
if err != nil {
return nil, err
}
diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go
index ac63f8434..66389feff 100644
--- a/internal/git/catfile/cache.go
+++ b/internal/git/catfile/cache.go
@@ -31,6 +31,9 @@ type Cache interface {
// BatchProcess either creates a new git-cat-file(1) process or returns a cached one for
// the given repository.
BatchProcess(context.Context, git.RepositoryExecutor) (Batch, error)
+ // ObjectReader either creates a new object reader or returns a cached one for the given
+ // repository.
+ ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, error)
// Evict evicts all cached processes from the cache.
Evict()
}
@@ -53,6 +56,7 @@ type ProcessCache struct {
monitorDone chan interface{}
batchProcesses stack
+ objectReaders stack
catfileCacheCounter *prometheus.CounterVec
currentCatfileProcesses prometheus.Gauge
@@ -81,6 +85,9 @@ func newCache(ttl time.Duration, maxLen int, refreshInterval time.Duration) *Pro
batchProcesses: stack{
maxLen: maxLen,
},
+ objectReaders: stack{
+ maxLen: maxLen,
+ },
catfileCacheCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitaly_catfile_cache_total",
@@ -141,6 +148,7 @@ func (c *ProcessCache) monitor() {
select {
case <-c.monitorTicker.C:
c.batchProcesses.EnforceTTL(time.Now())
+ c.objectReaders.EnforceTTL(time.Now())
case <-c.monitorDone:
close(c.monitorDone)
return
@@ -176,6 +184,23 @@ func (c *ProcessCache) BatchProcess(ctx context.Context, repo git.RepositoryExec
return batch, nil
}
+// ObjectReader creates a new ObjectReader process for the given repository.
+func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, error) {
+ cacheable, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) {
+ return newObjectReader(ctx, repo, c.catfileLookupCounter)
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ objectReader, ok := cacheable.(ObjectReader)
+ if !ok {
+ return nil, fmt.Errorf("expected object reader, got %T", objectReader)
+ }
+
+ return objectReader, nil
+}
+
func (c *ProcessCache) getOrCreateProcess(
ctx context.Context,
repo repository.GitRepo,
@@ -254,11 +279,13 @@ func (c *ProcessCache) getOrCreateProcess(
func (c *ProcessCache) reportCacheMembers() {
c.catfileCacheMembers.WithLabelValues("batch").Set(float64(c.batchProcesses.EntryCount()))
+ c.catfileCacheMembers.WithLabelValues("object_reader").Set(float64(c.objectReaders.EntryCount()))
}
// Evict evicts all cached processes from the cache.
func (c *ProcessCache) Evict() {
c.batchProcesses.Evict()
+ c.objectReaders.Evict()
}
func (c *ProcessCache) returnWhenDone(done <-chan struct{}, s *stack, cacheKey key, value cacheable, cancel func()) {
diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go
index 82c96b532..0cb105ebf 100644
--- a/internal/git/catfile/cache_test.go
+++ b/internal/git/catfile/cache_test.go
@@ -340,6 +340,142 @@ func TestCache_BatchProcess(t *testing.T) {
})
}
+func TestCache_ObjectReader(t *testing.T) {
+ cfg, repo, _ := testcfg.BuildWithRepo(t)
+ repoExecutor := newRepoExecutor(t, cfg, repo)
+
+ cache := newCache(time.Hour, 10, time.Hour)
+ defer cache.Stop()
+ cache.cachedProcessDone = sync.NewCond(&sync.Mutex{})
+
+ t.Run("uncancellable", func(t *testing.T) {
+ ctx := context.Background()
+
+ require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() {
+ _, _ = cache.ObjectReader(ctx, repoExecutor)
+ })
+ })
+
+ t.Run("uncacheable", func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ // The context doesn't carry a session ID and is thus uncacheable.
+ // The process should never get returned to the cache and must be
+ // killed on context cancellation.
+ reader, err := cache.ObjectReader(ctx, repoExecutor)
+ require.NoError(t, err)
+
+ objectReaderImpl, ok := reader.(*objectReader)
+ require.True(t, ok, "expected object reader")
+
+ cancel()
+
+ // We're cheating a bit here to avoid creating a racy test by reaching into the
+ // process and trying to read from its stdout. If the cancel did kill the process as
+ // expected, then the stdout should be closed and we'll get an EOF.
+ output, err := io.ReadAll(objectReaderImpl.stdout)
+ if err != nil {
+ require.True(t, errors.Is(err, os.ErrClosed))
+ } else {
+ require.NoError(t, err)
+ }
+ require.Empty(t, output)
+
+ require.True(t, reader.isClosed())
+ require.Empty(t, keys(t, &cache.objectReaders))
+ })
+
+ t.Run("cacheable", func(t *testing.T) {
+ defer cache.Evict()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ctx = correlation.ContextWithCorrelation(ctx, "1")
+ ctx = testhelper.MergeIncomingMetadata(ctx,
+ metadata.Pairs(SessionIDField, "1"),
+ )
+
+ reader, err := cache.ObjectReader(ctx, repoExecutor)
+ require.NoError(t, err)
+
+ // Cancel the context such that the process will be considered for return to the
+ // cache and wait for the cache to collect it.
+ cache.cachedProcessDone.L.Lock()
+ cancel()
+ defer cache.cachedProcessDone.L.Unlock()
+ cache.cachedProcessDone.Wait()
+
+ keys := keys(t, &cache.objectReaders)
+ require.Equal(t, []key{{
+ sessionID: "1",
+ repoStorage: repo.GetStorageName(),
+ repoRelPath: repo.GetRelativePath(),
+ }}, keys)
+
+ // Assert that we can still read from the cached process.
+ _, err = reader.Object(ctx, "refs/heads/master")
+ require.NoError(t, err)
+ })
+
+ t.Run("dirty process does not get cached", func(t *testing.T) {
+ defer cache.Evict()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ctx = testhelper.MergeIncomingMetadata(ctx,
+ metadata.Pairs(SessionIDField, "1"),
+ )
+
+ reader, err := cache.ObjectReader(ctx, repoExecutor)
+ require.NoError(t, err)
+
+ // While we request object data, we do not consume it at all. The reader is thus
+ // dirty and cannot be reused and shouldn't be returned to the cache.
+ object, err := reader.Object(ctx, "refs/heads/master")
+ require.NoError(t, err)
+
+ // Cancel the context such that the process will be considered for return to the
+ // cache and wait for the cache to collect it.
+ cache.cachedProcessDone.L.Lock()
+ cancel()
+ defer cache.cachedProcessDone.L.Unlock()
+ cache.cachedProcessDone.Wait()
+
+ require.Empty(t, keys(t, &cache.objectReaders))
+
+ // The process should be killed now, so reading the object must fail.
+ _, err = io.ReadAll(object)
+ require.True(t, errors.Is(err, os.ErrClosed))
+ })
+
+ t.Run("closed process does not get cached", func(t *testing.T) {
+ defer cache.Evict()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ctx = testhelper.MergeIncomingMetadata(ctx,
+ metadata.Pairs(SessionIDField, "1"),
+ )
+
+ reader, err := cache.ObjectReader(ctx, repoExecutor)
+ require.NoError(t, err)
+
+ // Closed processes naturally cannot be reused anymore and thus shouldn't ever get
+ // cached.
+ reader.close()
+
+ // Cancel the context such that the process will be considered for return to the
+ // cache and wait for the cache to collect it.
+ cache.cachedProcessDone.L.Lock()
+ cancel()
+ defer cache.cachedProcessDone.L.Unlock()
+ cache.cachedProcessDone.Wait()
+
+ require.Empty(t, keys(t, &cache.objectReaders))
+ })
+}
+
func requireStackValid(t *testing.T, s *stack) {
s.entriesMutex.Lock()
defer s.entriesMutex.Unlock()
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 024b423c4..a87a118b1 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -21,6 +21,15 @@ type Object struct {
io.Reader
}
+// ObjectReader is a reader for Git objects.
+type ObjectReader interface {
+ cacheable
+
+ // Reader returns a new Object for the given revision. The Object must be fully consumed
+ // before another object is requested.
+ Object(_ context.Context, _ git.Revision) (*Object, error)
+}
+
// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
// --batch` process such that we do not have to spawn a new process for each object we are about to
// read.
@@ -41,6 +50,8 @@ type objectReader struct {
// unpredictable way.
sync.Mutex
+ closed bool
+
// creationCtx is the context in which this reader has been created. This context may
// potentially be decorrelated from the "real" RPC context in case the reader is going to be
// cached.
@@ -84,10 +95,21 @@ func newObjectReader(
}
func (o *objectReader) close() {
+ o.Lock()
+ defer o.Unlock()
+
_ = o.cmd.Wait()
+
+ o.closed = true
+}
+
+func (o *objectReader) isClosed() bool {
+ o.Lock()
+ defer o.Unlock()
+ return o.closed
}
-func (o *objectReader) reader(
+func (o *objectReader) Object(
ctx context.Context,
revision git.Revision,
) (*Object, error) {
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index bc092aa0d..aad733f78 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -29,7 +29,7 @@ func TestObjectReader_reader(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
- object, err := reader.reader(ctx, "refs/heads/master")
+ object, err := reader.Object(ctx, "refs/heads/master")
require.NoError(t, err)
data, err := io.ReadAll(object)
@@ -41,7 +41,7 @@ func TestObjectReader_reader(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
- object, err := reader.reader(ctx, commitID.Revision())
+ object, err := reader.Object(ctx, commitID.Revision())
require.NoError(t, err)
data, err := io.ReadAll(object)
@@ -54,11 +54,11 @@ func TestObjectReader_reader(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
- _, err = reader.reader(ctx, "refs/heads/does-not-exist")
+ _, err = reader.Object(ctx, "refs/heads/does-not-exist")
require.EqualError(t, err, "object not found")
// Verify that we're still able to read a commit after the previous read has failed.
- object, err := reader.reader(ctx, commitID.Revision())
+ object, err := reader.Object(ctx, commitID.Revision())
require.NoError(t, err)
data, err := io.ReadAll(object)
@@ -71,11 +71,11 @@ func TestObjectReader_reader(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
- _, err = reader.reader(ctx, commitID.Revision())
+ _, err = reader.Object(ctx, commitID.Revision())
require.NoError(t, err)
// We haven't yet consumed the previous object, so this must now fail.
- _, err = reader.reader(ctx, commitID.Revision())
+ _, err = reader.Object(ctx, commitID.Revision())
require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1))
})
@@ -83,14 +83,14 @@ func TestObjectReader_reader(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
- object, err := reader.reader(ctx, commitID.Revision())
+ object, err := reader.Object(ctx, commitID.Revision())
require.NoError(t, err)
_, err = io.CopyN(io.Discard, object, 100)
require.NoError(t, err)
// We haven't yet consumed the previous object, so this must now fail.
- _, err = reader.reader(ctx, commitID.Revision())
+ _, err = reader.Object(ctx, commitID.Revision())
require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1))
})
@@ -108,7 +108,7 @@ func TestObjectReader_reader(t *testing.T) {
} {
require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues(objectType)))
- object, err := reader.reader(ctx, revision)
+ object, err := reader.Object(ctx, revision)
require.NoError(t, err)
require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues(objectType)))