diff options
author | John Cai <jcai@gitlab.com> | 2022-04-29 17:42:22 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-04-29 17:42:22 +0300 |
commit | b3a0b630bd33b6f23902203d5443c7518b316b41 (patch) | |
tree | 425e3fb7e8cb6318f38ac2923a29a03c05e40085 | |
parent | 89d1dfc6c5d03e929ba4f419043577be25869d4f (diff) | |
parent | 77b8ced5bd60eebd0614e5ab408d048ab1116d43 (diff) |
Merge branch 'pks-catfile-fix-flaky-process-termination-test' into 'master'
catfile: Make process cancellation synchronous to fix test race
See merge request gitlab-org/gitaly!4500
43 files changed, 213 insertions, 232 deletions
diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index e86248feb..293ec3e65 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -34,10 +34,10 @@ const ( type Cache interface { // ObjectReader either creates a new object reader or returns a cached one for the given // repository. - ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, error) + ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, func(), error) // ObjectInfoReader either creates a new object info reader or returns a cached one for the // given repository. - ObjectInfoReader(context.Context, git.RepositoryExecutor) (ObjectInfoReader, error) + ObjectInfoReader(context.Context, git.RepositoryExecutor) (ObjectInfoReader, func(), error) // Evict evicts all cached processes from the cache. Evict() } @@ -67,11 +67,6 @@ type ProcessCache struct { totalCatfileProcesses prometheus.Counter catfileLookupCounter *prometheus.CounterVec catfileCacheMembers *prometheus.GaugeVec - - // cachedProcessDone is a condition that gets signalled whenever a process is being - // considered to be returned to the cache. This field is optional and must only be used in - // tests. - cachedProcessDone *sync.Cond } // NewCache creates a new catfile process cache. @@ -175,37 +170,37 @@ func (c *ProcessCache) Stop() { } // ObjectReader creates a new ObjectReader process for the given repository. -func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, error) { - cacheable, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { +func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, func(), error) { + cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { return newObjectReader(ctx, repo, c.catfileLookupCounter) - }) + }, "catfile.ObjectReader") if err != nil { - return nil, err + return nil, nil, err } objectReader, ok := cacheable.(ObjectReader) if !ok { - return nil, fmt.Errorf("expected object reader, got %T", cacheable) + return nil, nil, fmt.Errorf("expected object reader, got %T", cacheable) } - return objectReader, nil + return objectReader, cancel, nil } // ObjectInfoReader creates a new ObjectInfoReader process for the given repository. -func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectInfoReader, error) { - cacheable, err := c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { +func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectInfoReader, func(), error) { + cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { return newObjectInfoReader(ctx, repo, c.catfileLookupCounter) - }) + }, "catfile.ObjectInfoReader") if err != nil { - return nil, err + return nil, nil, err } objectInfoReader, ok := cacheable.(ObjectInfoReader) if !ok { - return nil, fmt.Errorf("expected object info reader, got %T", cacheable) + return nil, nil, fmt.Errorf("expected object info reader, got %T", cacheable) } - return objectInfoReader, nil + return objectInfoReader, cancel, nil } func (c *ProcessCache) getOrCreateProcess( @@ -213,75 +208,92 @@ func (c *ProcessCache) getOrCreateProcess( repo repository.GitRepo, processes *processes, create func(context.Context) (cacheable, error), -) (_ cacheable, returnedErr error) { - requestDone := ctx.Done() - if requestDone == nil { - panic("empty ctx.Done() in catfile.Batch.New()") - } - + spanName string, +) (_ cacheable, _ func(), returnedErr error) { defer c.reportCacheMembers() - var cancel func() cacheKey, isCacheable := newCacheKey(metadata.GetValue(ctx, SessionIDField), repo) if isCacheable { - // We only try to look up cached batch processes in case it is cacheable, which - // requires a session ID. This is mostly done such that git-cat-file(1) processes - // from one user cannot interfer with those from another user. The main intent is to - // disallow trivial denial of service attacks against other users in case it is - // possible to poison the cache with broken git-cat-file(1) processes. + // We only try to look up cached processes in case it is cacheable, which requires a + // session ID. This is mostly done such that git-cat-file(1) processes from one user + // cannot interfere with those from another user. The main intent is to disallow + // trivial denial of service attacks against other users in case it is possible to + // poison the cache with broken git-cat-file(1) processes. if entry, ok := processes.Checkout(cacheKey); ok { - go c.returnWhenDone(requestDone, processes, cacheKey, entry.value, entry.cancel) c.catfileCacheCounter.WithLabelValues("hit").Inc() - return entry.value, nil + return entry.value, func() { + c.returnToCache(processes, cacheKey, entry.value, entry.cancel) + }, nil } c.catfileCacheCounter.WithLabelValues("miss").Inc() // We have not found any cached process, so we need to create a new one. In this // case, we need to detach the process from the current context such that it does - // not get killed when the current context is done. Note that while we explicitly - // `close()` processes in case this function fails, we must have a cancellable - // context or otherwise our `command` package will panic. - ctx, cancel = context.WithCancel(context.Background()) - defer func() { - if returnedErr != nil { - cancel() - } - }() - + // not get killed when the parent context is cancelled. + // + // Note that we explicitly retain feature flags here, which means that cached + // processes may retain flags for some time which have been changed meanwhile. While + // not ideal, it feels better compared to just ignoring feature flags altogether. + // The latter would mean that we cannot use flags in the catfile code, but more + // importantly we also wouldn't be able to use feature-flagged Git version upgrades + // for catfile processes. + ctx = helper.SuppressCancellation(ctx) // We have to decorrelate the process from the current context given that it // may potentially be reused across different RPC calls. ctx = correlation.ContextWithCorrelation(ctx, "") ctx = opentracing.ContextWithSpan(ctx, nil) } - batch, err := create(ctx) + span, ctx := opentracing.StartSpanFromContext(ctx, spanName) + + // Create a new cancellable process context such that we can kill it on demand. If it's a + // cached process, then it will only be killed when the cache evicts the entry because we + // detached the background further up. If it's an uncached value, we either kill it manually + // or via the RPC context's cancellation function. + ctx, cancelProcessContext := context.WithCancel(ctx) + defer func() { + if returnedErr != nil { + cancelProcessContext() + } + }() + + process, err := create(ctx) if err != nil { - return nil, err + return nil, nil, err } defer func() { - // If we somehow fail after creating a new Batch process, then we want to kill - // spawned processes right away. + // If we somehow fail after creating a new process, then we want to kill spawned + // processes right away. if returnedErr != nil { - batch.close() + process.close() } }() c.totalCatfileProcesses.Inc() c.currentCatfileProcesses.Inc() - go func() { - <-ctx.Done() + + // Note that we must make sure that `cancel` and `closeProcess` are two different variables. + // Otherwise if we passed `cancel` to `returnToCache` and then set `cancel` itself to the + // function that calls it we would accidentally call ourselves and end up with a segfault. + closeProcess := func() { + cancelProcessContext() + process.close() + span.Finish() c.currentCatfileProcesses.Dec() - }() + } + cancel := closeProcess if isCacheable { // If the process is cacheable, then we want to put the process into the cache when // the current outer context is done. - go c.returnWhenDone(requestDone, processes, cacheKey, batch, cancel) + cancel = func() { + c.returnToCache(processes, cacheKey, process, closeProcess) + } } - return batch, nil + return process, cancel, nil } func (c *ProcessCache) reportCacheMembers() { @@ -295,16 +307,9 @@ func (c *ProcessCache) Evict() { c.objectInfoReaders.Evict() } -func (c *ProcessCache) returnWhenDone(done <-chan struct{}, p *processes, cacheKey key, value cacheable, cancel func()) { - <-done - +func (c *ProcessCache) returnToCache(p *processes, cacheKey key, value cacheable, cancel func()) { defer func() { c.reportCacheMembers() - if c.cachedProcessDone != nil { - c.cachedProcessDone.L.Lock() - defer c.cachedProcessDone.L.Unlock() - c.cachedProcessDone.Broadcast() - } }() if value == nil || value.isClosed() { diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go index b90124ec8..0f177be4b 100644 --- a/internal/git/catfile/cache_test.go +++ b/internal/git/catfile/cache_test.go @@ -5,7 +5,6 @@ import ( "errors" "io" "os" - "sync" "testing" "time" @@ -183,62 +182,36 @@ func TestCache_ObjectReader(t *testing.T) { cache := newCache(time.Hour, 10, helper.NewManualTicker()) defer cache.Stop() - cache.cachedProcessDone = sync.NewCond(&sync.Mutex{}) - t.Run("uncancellable", func(t *testing.T) { - ctx := testhelper.ContextWithoutCancel() - - require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() { - _, _ = cache.ObjectReader(ctx, repoExecutor) - }) - }) + ctx := testhelper.Context(t) t.Run("uncacheable", func(t *testing.T) { - ctx, cancel := context.WithCancel(testhelper.Context(t)) - // The context doesn't carry a session ID and is thus uncacheable. // The process should never get returned to the cache and must be // killed on context cancellation. - reader, err := cache.ObjectReader(ctx, repoExecutor) + reader, cancel, err := cache.ObjectReader(ctx, repoExecutor) require.NoError(t, err) - objectReaderImpl, ok := reader.(*objectReader) - require.True(t, ok, "expected object reader") - cancel() - // We're cheating a bit here to avoid creating a racy test by reaching into the - // process and trying to read from its stdout. If the cancel did kill the process as - // expected, then the stdout should be closed and we'll get an EOF. - output, err := io.ReadAll(objectReaderImpl.queue.stdout) - if err != nil { - require.True(t, errors.Is(err, os.ErrClosed)) - } else { - require.NoError(t, err) - } - require.Empty(t, output) - require.True(t, reader.isClosed()) require.Empty(t, keys(t, &cache.objectReaders)) }) t.Run("cacheable", func(t *testing.T) { defer cache.Evict() - ctx, cancel := context.WithCancel(testhelper.Context(t)) - ctx = correlation.ContextWithCorrelation(ctx, "1") + + ctx := correlation.ContextWithCorrelation(ctx, "1") ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(SessionIDField, "1"), ) - reader, err := cache.ObjectReader(ctx, repoExecutor) + reader, cancel, err := cache.ObjectReader(ctx, repoExecutor) require.NoError(t, err) // Cancel the context such that the process will be considered for return to the // cache and wait for the cache to collect it. - cache.cachedProcessDone.L.Lock() cancel() - defer cache.cachedProcessDone.L.Unlock() - cache.cachedProcessDone.Wait() keys := keys(t, &cache.objectReaders) require.Equal(t, []key{{ @@ -254,12 +227,12 @@ func TestCache_ObjectReader(t *testing.T) { t.Run("dirty process does not get cached", func(t *testing.T) { defer cache.Evict() - ctx, cancel := context.WithCancel(testhelper.Context(t)) - ctx = testhelper.MergeIncomingMetadata(ctx, + + ctx := testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(SessionIDField, "1"), ) - reader, err := cache.ObjectReader(ctx, repoExecutor) + reader, cancel, err := cache.ObjectReader(ctx, repoExecutor) require.NoError(t, err) // While we request object data, we do not consume it at all. The reader is thus @@ -267,12 +240,8 @@ func TestCache_ObjectReader(t *testing.T) { object, err := reader.Object(ctx, "refs/heads/master") require.NoError(t, err) - // Cancel the context such that the process will be considered for return to the - // cache and wait for the cache to collect it. - cache.cachedProcessDone.L.Lock() + // Cancel the process such that it will be considered for return to the cache. cancel() - defer cache.cachedProcessDone.L.Unlock() - cache.cachedProcessDone.Wait() require.Empty(t, keys(t, &cache.objectReaders)) @@ -283,24 +252,20 @@ func TestCache_ObjectReader(t *testing.T) { t.Run("closed process does not get cached", func(t *testing.T) { defer cache.Evict() - ctx, cancel := context.WithCancel(testhelper.Context(t)) - ctx = testhelper.MergeIncomingMetadata(ctx, + + ctx := testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(SessionIDField, "1"), ) - reader, err := cache.ObjectReader(ctx, repoExecutor) + reader, cancel, err := cache.ObjectReader(ctx, repoExecutor) require.NoError(t, err) // Closed processes naturally cannot be reused anymore and thus shouldn't ever get // cached. reader.close() - // Cancel the context such that the process will be considered for return to the - // cache and wait for the cache to collect it. - cache.cachedProcessDone.L.Lock() + // Cancel the process such that it will be considered for return to the cache. cancel() - defer cache.cachedProcessDone.L.Unlock() - cache.cachedProcessDone.Wait() require.Empty(t, keys(t, &cache.objectReaders)) }) @@ -312,62 +277,35 @@ func TestCache_ObjectInfoReader(t *testing.T) { cache := newCache(time.Hour, 10, helper.NewManualTicker()) defer cache.Stop() - cache.cachedProcessDone = sync.NewCond(&sync.Mutex{}) - - t.Run("uncancellable", func(t *testing.T) { - ctx := testhelper.ContextWithoutCancel() - require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() { - _, _ = cache.ObjectInfoReader(ctx, repoExecutor) - }) - }) + ctx := testhelper.Context(t) t.Run("uncacheable", func(t *testing.T) { - ctx, cancel := context.WithCancel(testhelper.Context(t)) - // The context doesn't carry a session ID and is thus uncacheable. // The process should never get returned to the cache and must be // killed on context cancellation. - reader, err := cache.ObjectInfoReader(ctx, repoExecutor) + reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor) require.NoError(t, err) - objectInfoReaderImpl, ok := reader.(*objectInfoReader) - require.True(t, ok, "expected object reader") - cancel() - // We're cheating a bit here to avoid creating a racy test by reaching into the - // process and trying to read from its stdout. If the cancel did kill the process as - // expected, then the stdout should be closed and we'll get an EOF. - output, err := io.ReadAll(objectInfoReaderImpl.queue.stdout) - if err != nil { - require.True(t, errors.Is(err, os.ErrClosed)) - } else { - require.NoError(t, err) - } - require.Empty(t, output) - require.True(t, reader.isClosed()) require.Empty(t, keys(t, &cache.objectInfoReaders)) }) t.Run("cacheable", func(t *testing.T) { defer cache.Evict() - ctx, cancel := context.WithCancel(testhelper.Context(t)) - ctx = correlation.ContextWithCorrelation(ctx, "1") + + ctx := correlation.ContextWithCorrelation(ctx, "1") ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(SessionIDField, "1"), ) - reader, err := cache.ObjectInfoReader(ctx, repoExecutor) + reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor) require.NoError(t, err) - // Cancel the context such that the process will be considered for return to the - // cache and wait for the cache to collect it. - cache.cachedProcessDone.L.Lock() + // Cancel the process such it will be considered for return to the cache. cancel() - defer cache.cachedProcessDone.L.Unlock() - cache.cachedProcessDone.Wait() keys := keys(t, &cache.objectInfoReaders) require.Equal(t, []key{{ @@ -383,24 +321,20 @@ func TestCache_ObjectInfoReader(t *testing.T) { t.Run("closed process does not get cached", func(t *testing.T) { defer cache.Evict() - ctx, cancel := context.WithCancel(testhelper.Context(t)) - ctx = testhelper.MergeIncomingMetadata(ctx, + + ctx := testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(SessionIDField, "1"), ) - reader, err := cache.ObjectInfoReader(ctx, repoExecutor) + reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor) require.NoError(t, err) // Closed processes naturally cannot be reused anymore and thus shouldn't ever get // cached. reader.close() - // Cancel the context such that the process will be considered for return to the - // cache and wait for the cache to collect it. - cache.cachedProcessDone.L.Lock() + // Cancel the process such that it will be considered for return to the cache. cancel() - defer cache.cachedProcessDone.L.Unlock() - cache.cachedProcessDone.Wait() require.Empty(t, keys(t, &cache.objectInfoReaders)) }) diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go index 300459e81..e7773acd2 100644 --- a/internal/git/catfile/object_info_reader.go +++ b/internal/git/catfile/object_info_reader.go @@ -8,7 +8,6 @@ import ( "strings" "sync/atomic" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/command" "gitlab.com/gitlab-org/gitaly/v14/internal/git" @@ -139,8 +138,6 @@ func newObjectInfoReader( repo git.RepositoryExecutor, counter *prometheus.CounterVec, ) (*objectInfoReader, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectInfoReader") - batchCmd, err := repo.Exec(ctx, git.SubCmd{ Name: "cat-file", @@ -163,12 +160,6 @@ func newObjectInfoReader( stdin: bufio.NewWriter(batchCmd), }, } - go func() { - <-ctx.Done() - // This is crucial to prevent leaking file descriptors. - objectInfoReader.close() - span.Finish() - }() return objectInfoReader, nil } diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 7020adcbe..8dedb1572 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -8,7 +8,6 @@ import ( "os" "sync/atomic" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/command" "gitlab.com/gitlab-org/gitaly/v14/internal/git" @@ -61,8 +60,6 @@ func newObjectReader( repo git.RepositoryExecutor, counter *prometheus.CounterVec, ) (*objectReader, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectReader") - batchCmd, err := repo.Exec(ctx, git.SubCmd{ Name: "cat-file", @@ -86,11 +83,6 @@ func newObjectReader( stdin: bufio.NewWriter(batchCmd), }, } - go func() { - <-ctx.Done() - objectReader.close() - span.Finish() - }() return objectReader, nil } diff --git a/internal/git/catfile/testhelper_test.go b/internal/git/catfile/testhelper_test.go index 9ad072f10..b7e7172cd 100644 --- a/internal/git/catfile/testhelper_test.go +++ b/internal/git/catfile/testhelper_test.go @@ -60,8 +60,9 @@ func setupObjectReader(t *testing.T, ctx context.Context) (config.Cfg, ObjectRea cache := newCache(1*time.Hour, 1000, helper.NewTimerTicker(defaultEvictionInterval)) t.Cleanup(cache.Stop) - objectReader, err := cache.ObjectReader(ctx, repoExecutor) + objectReader, cancel, err := cache.ObjectReader(ctx, repoExecutor) require.NoError(t, err) + t.Cleanup(cancel) return cfg, objectReader, repo } diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 0c774d48d..80a1810d9 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -132,8 +132,9 @@ func TestCatfileInfo(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) + defer cancel() it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...) require.NoError(t, err) diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index fa74d2f06..95a9b3ad6 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -74,8 +74,9 @@ func TestCatfileObject(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectReader, err := catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) + defer cancel() it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs)) require.NoError(t, err) diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index 069bb5584..544ad552d 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -224,11 +224,13 @@ func TestPipeline_revlist(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) + defer cancel() - objectReader, err := catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) + defer cancel() revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...) @@ -280,11 +282,13 @@ func TestPipeline_revlist(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancelReader, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) + defer cancelReader() - objectReader, err := catfileCache.ObjectReader(ctx, repo) + objectReader, cancelReader, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) + defer cancelReader() revlistIter := Revlist(ctx, repo, []string{"--all"}) @@ -320,11 +324,13 @@ func TestPipeline_revlist(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) + defer cancel() - objectReader, err := catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) + defer cancel() revlistIter := Revlist(ctx, repo, []string{"--all"}, WithObjects()) @@ -372,11 +378,13 @@ func TestPipeline_forEachRef(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) + defer cancel() - objectReader, err := catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) + defer cancel() forEachRefIter := ForEachRef(ctx, repo, nil) diff --git a/internal/git/gittest/commit_test.go b/internal/git/gittest/commit_test.go index 515455252..3494ad382 100644 --- a/internal/git/gittest/commit_test.go +++ b/internal/git/gittest/commit_test.go @@ -22,8 +22,9 @@ func TestWriteCommit(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectReader, err := catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) + defer cancel() defaultCommitter := &gitalypb.CommitAuthor{ Name: []byte("Scrooge McDuck"), diff --git a/internal/git/localrepo/objects.go b/internal/git/localrepo/objects.go index e5459d13b..fa37ab29e 100644 --- a/internal/git/localrepo/objects.go +++ b/internal/git/localrepo/objects.go @@ -226,10 +226,11 @@ func (repo *Repo) ReadCommit(ctx context.Context, revision git.Revision, opts .. opt(&cfg) } - objectReader, err := repo.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := repo.catfileCache.ObjectReader(ctx, repo) if err != nil { return nil, err } + defer cancel() var commit *gitalypb.GitCommit if cfg.withTrailers { diff --git a/internal/git/log/parser.go b/internal/git/log/parser.go index f176fe140..7b7cf5d37 100644 --- a/internal/git/log/parser.go +++ b/internal/git/log/parser.go @@ -23,10 +23,10 @@ type Parser struct { } // NewParser returns a new Parser -func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, src io.Reader) (*Parser, error) { - objectReader, err := catfileCache.ObjectReader(ctx, repo) +func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, src io.Reader) (*Parser, func(), error) { + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) if err != nil { - return nil, err + return nil, nil, err } parser := &Parser{ @@ -34,7 +34,7 @@ func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.Reposit objectReader: objectReader, } - return parser, nil + return parser, cancel, nil } // Parse parses a single git log line. It returns true if successful, false if it finished diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go index 50a8a6274..86acae6a2 100644 --- a/internal/gitaly/service/blob/blobs.go +++ b/internal/gitaly/service/blob/blobs.go @@ -98,10 +98,11 @@ func (s *server) processBlobs( // object iterator. We thus support an optional `catfileInfoIter` parameter: if set, // we just use that one and ignore the object iterator. if catfileInfoIter == nil { - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err)) } + defer cancel() catfileInfoIter, err = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter) if err != nil { @@ -132,10 +133,11 @@ func (s *server) processBlobs( return helper.ErrInternal(err) } } else { - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } + defer cancel() catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter) if err != nil { diff --git a/internal/gitaly/service/blob/get_blob.go b/internal/gitaly/service/blob/get_blob.go index 99ce66873..d7784ea7b 100644 --- a/internal/gitaly/service/blob/get_blob.go +++ b/internal/gitaly/service/blob/get_blob.go @@ -20,10 +20,11 @@ func (s *server) GetBlob(in *gitalypb.GetBlobRequest, stream gitalypb.BlobServic return helper.ErrInvalidArgumentf("GetBlob: %v", err) } - objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo) if err != nil { return helper.ErrInternalf("GetBlob: %v", err) } + defer cancel() blob, err := objectReader.Object(ctx, git.Revision(in.Oid)) if err != nil { diff --git a/internal/gitaly/service/blob/get_blobs.go b/internal/gitaly/service/blob/get_blobs.go index 5b899ebe7..237c85c17 100644 --- a/internal/gitaly/service/blob/get_blobs.go +++ b/internal/gitaly/service/blob/get_blobs.go @@ -159,15 +159,17 @@ func (s *server) GetBlobs(req *gitalypb.GetBlobsRequest, stream gitalypb.BlobSer repo := s.localrepo(req.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo) if err != nil { return err } + defer cancel() - objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo) + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo) if err != nil { return err } + defer cancel() return sendGetBlobsResponse(req, stream, objectReader, objectInfoReader) } diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index ca05b4750..57027d4c8 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -52,10 +52,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git repo := s.localrepo(in.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } + defer cancel() revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), gitpipe.WithObjects(), @@ -94,10 +95,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre }, }) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } + defer cancel() catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo, gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { @@ -137,15 +139,17 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita }, }) - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err)) } + defer cancel() - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } + defer cancel() blobs := make([]gitpipe.RevisionResult, len(req.GetBlobIds())) for i, blobID := range req.GetBlobIds() { @@ -160,6 +164,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita if err != nil { return helper.ErrInternalf("creating object info iterator: %w", err) } + defer cancel() catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) if err != nil { diff --git a/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go b/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go index d10c483d1..bcd015bab 100644 --- a/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go +++ b/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go @@ -40,10 +40,11 @@ func (s *server) ApplyBfgObjectMapStream(server gitalypb.CleanupService_ApplyBfg reader := &bfgStreamReader{firstRequest: firstRequest, server: server} chunker := chunk.New(&bfgStreamWriter{server: server}) - notifier, err := notifier.New(ctx, s.catfileCache, repo, chunker) + notifier, cancel, err := notifier.New(ctx, s.catfileCache, repo, chunker) if err != nil { return helper.ErrInternal(err) } + defer cancel() // It doesn't matter if new internal references are added after this RPC // starts running - they shouldn't point to the objects removed by the BFG diff --git a/internal/gitaly/service/cleanup/notifier/notifier.go b/internal/gitaly/service/cleanup/notifier/notifier.go index 6b6acff88..140b29eae 100644 --- a/internal/gitaly/service/cleanup/notifier/notifier.go +++ b/internal/gitaly/service/cleanup/notifier/notifier.go @@ -17,13 +17,13 @@ type Notifier struct { } // New instantiates a new Notifier -func New(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, chunker *chunk.Chunker) (*Notifier, error) { - objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) +func New(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, chunker *chunk.Chunker) (*Notifier, func(), error) { + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) if err != nil { - return nil, err + return nil, nil, err } - return &Notifier{objectInfoReader: objectInfoReader, chunker: chunker}, nil + return &Notifier{objectInfoReader: objectInfoReader, chunker: chunker}, cancel, nil } // Notify builds a new message and sends it to the chunker diff --git a/internal/gitaly/service/commit/check_objects_exist.go b/internal/gitaly/service/commit/check_objects_exist.go index baab66f51..83512b340 100644 --- a/internal/gitaly/service/commit/check_objects_exist.go +++ b/internal/gitaly/service/commit/check_objects_exist.go @@ -26,13 +26,14 @@ func (s *server) CheckObjectsExist( return err } - objectInfoReader, err := s.catfileCache.ObjectInfoReader( + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader( ctx, s.localrepo(request.GetRepository()), ) if err != nil { return err } + defer cancel() chunker := chunk.New(&checkObjectsExistSender{stream: stream}) for { diff --git a/internal/gitaly/service/commit/commit_messages.go b/internal/gitaly/service/commit/commit_messages.go index 25b334e9e..0e9424f52 100644 --- a/internal/gitaly/service/commit/commit_messages.go +++ b/internal/gitaly/service/commit/commit_messages.go @@ -27,10 +27,11 @@ func (s *server) getAndStreamCommitMessages(request *gitalypb.GetCommitMessagesR ctx := stream.Context() repo := s.localrepo(request.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() for _, commitID := range request.GetCommitIds() { msg, err := catfile.GetCommitMessage(ctx, objectReader, repo, git.Revision(commitID)) diff --git a/internal/gitaly/service/commit/commit_signatures.go b/internal/gitaly/service/commit/commit_signatures.go index f2e3a7ac4..91ea5bf7c 100644 --- a/internal/gitaly/service/commit/commit_signatures.go +++ b/internal/gitaly/service/commit/commit_signatures.go @@ -30,10 +30,11 @@ func (s *server) getCommitSignatures(request *gitalypb.GetCommitSignaturesReques ctx := stream.Context() repo := s.localrepo(request.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(err) } + defer cancel() for _, commitID := range request.CommitIds { commitObj, err := objectReader.Object(ctx, git.Revision(commitID)+"^{commit}") diff --git a/internal/gitaly/service/commit/commits_helper.go b/internal/gitaly/service/commit/commits_helper.go index cb5715992..2287d9674 100644 --- a/internal/gitaly/service/commit/commits_helper.go +++ b/internal/gitaly/service/commit/commits_helper.go @@ -29,10 +29,11 @@ func (s *server) sendCommits( return err } - logParser, err := log.NewParser(ctx, s.catfileCache, repo, cmd) + logParser, cancel, err := log.NewParser(ctx, s.catfileCache, repo, cmd) if err != nil { return err } + defer cancel() chunker := chunk.New(sender) for logParser.Parse(ctx) { diff --git a/internal/gitaly/service/commit/filter_shas_with_signatures.go b/internal/gitaly/service/commit/filter_shas_with_signatures.go index 09d22d881..3caa401fe 100644 --- a/internal/gitaly/service/commit/filter_shas_with_signatures.go +++ b/internal/gitaly/service/commit/filter_shas_with_signatures.go @@ -38,10 +38,11 @@ func (s *server) filterShasWithSignatures(bidi gitalypb.CommitService_FilterShas ctx := bidi.Context() repo := s.localrepo(firstRequest.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() request := firstRequest for { diff --git a/internal/gitaly/service/commit/find_commits.go b/internal/gitaly/service/commit/find_commits.go index 8d6aaf048..12c737b52 100644 --- a/internal/gitaly/service/commit/find_commits.go +++ b/internal/gitaly/service/commit/find_commits.go @@ -58,10 +58,11 @@ func (s *server) findCommits(ctx context.Context, req *gitalypb.FindCommitsReque return fmt.Errorf("error when creating git log command: %v", err) } - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return fmt.Errorf("creating catfile: %v", err) } + defer cancel() getCommits := NewGetCommits(logCmd, objectReader) diff --git a/internal/gitaly/service/commit/last_commit_for_path.go b/internal/gitaly/service/commit/last_commit_for_path.go index 9b8468c92..a95bfdcf3 100644 --- a/internal/gitaly/service/commit/last_commit_for_path.go +++ b/internal/gitaly/service/commit/last_commit_for_path.go @@ -29,10 +29,11 @@ func (s *server) lastCommitForPath(ctx context.Context, in *gitalypb.LastCommitF } repo := s.localrepo(in.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return nil, err } + defer cancel() options := in.GetGlobalOptions() diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go index d5fa18622..1709f5cf6 100644 --- a/internal/gitaly/service/commit/list_all_commits.go +++ b/internal/gitaly/service/commit/list_all_commits.go @@ -30,10 +30,11 @@ func (s *server) ListAllCommits( ctx := stream.Context() repo := s.localrepo(request.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternalf("creating object reader: %w", err) } + defer cancel() // If we've got a pagination token, then we will only start to print commits as soon as // we've seen the token. diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go index 826edb9ee..830e1bf46 100644 --- a/internal/gitaly/service/commit/list_commits.go +++ b/internal/gitaly/service/commit/list_commits.go @@ -39,10 +39,11 @@ func (s *server) ListCommits( ctx := stream.Context() repo := s.localrepo(request.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } + defer cancel() revlistOptions := []gitpipe.RevlistOption{} diff --git a/internal/gitaly/service/commit/list_commits_by_oid.go b/internal/gitaly/service/commit/list_commits_by_oid.go index 7ef89f58a..4c736b2d8 100644 --- a/internal/gitaly/service/commit/list_commits_by_oid.go +++ b/internal/gitaly/service/commit/list_commits_by_oid.go @@ -25,10 +25,11 @@ func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream g ctx := stream.Context() repo := s.localrepo(in.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() sender := chunk.New(&commitsByOidSender{stream: stream}) listCommitsbyOidHistogram.Observe(float64(len(in.Oid))) diff --git a/internal/gitaly/service/commit/list_commits_by_ref_name.go b/internal/gitaly/service/commit/list_commits_by_ref_name.go index f6d84e1cd..5b0784f77 100644 --- a/internal/gitaly/service/commit/list_commits_by_ref_name.go +++ b/internal/gitaly/service/commit/list_commits_by_ref_name.go @@ -13,10 +13,11 @@ func (s *server) ListCommitsByRefName(in *gitalypb.ListCommitsByRefNameRequest, ctx := stream.Context() repo := s.localrepo(in.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(err) } + defer cancel() sender := chunk.New(&commitsByRefNameSender{stream: stream}) diff --git a/internal/gitaly/service/commit/list_last_commits_for_tree.go b/internal/gitaly/service/commit/list_last_commits_for_tree.go index 99c3ea0a9..9d0a7a64e 100644 --- a/internal/gitaly/service/commit/list_last_commits_for_tree.go +++ b/internal/gitaly/service/commit/list_last_commits_for_tree.go @@ -35,10 +35,11 @@ func (s *server) listLastCommitsForTree(in *gitalypb.ListLastCommitsForTreeReque ctx := stream.Context() repo := s.localrepo(in.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() batch := make([]*gitalypb.ListLastCommitsForTreeResponse_CommitForTree, 0, maxNumStatBatchSize) entries, err := getLSTreeEntries(parser) diff --git a/internal/gitaly/service/commit/tree_entries.go b/internal/gitaly/service/commit/tree_entries.go index 96bf79693..ab252eecf 100644 --- a/internal/gitaly/service/commit/tree_entries.go +++ b/internal/gitaly/service/commit/tree_entries.go @@ -147,16 +147,19 @@ func (s *server) sendTreeEntries( } } else { var err error + var cancel func() - objectReader, err = s.catfileCache.ObjectReader(stream.Context(), repo) + objectReader, cancel, err = s.catfileCache.ObjectReader(stream.Context(), repo) if err != nil { return err } + defer cancel() - objectInfoReader, err = s.catfileCache.ObjectInfoReader(stream.Context(), repo) + objectInfoReader, cancel, err = s.catfileCache.ObjectInfoReader(stream.Context(), repo) if err != nil { return err } + defer cancel() entries, err = catfile.TreeEntries(ctx, objectReader, objectInfoReader, revision, path) if err != nil { diff --git a/internal/gitaly/service/commit/tree_entry.go b/internal/gitaly/service/commit/tree_entry.go index 2573fe3c0..9203931c4 100644 --- a/internal/gitaly/service/commit/tree_entry.go +++ b/internal/gitaly/service/commit/tree_entry.go @@ -132,15 +132,17 @@ func (s *server) TreeEntry(in *gitalypb.TreeEntryRequest, stream gitalypb.Commit requestPath = strings.TrimRight(requestPath, "/") } - objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo) if err != nil { return err } + defer cancel() - objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo) + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo) if err != nil { return err } + defer cancel() return sendTreeEntry(stream, objectReader, objectInfoReader, string(in.GetRevision()), requestPath, in.GetLimit(), in.GetMaxSize()) } diff --git a/internal/gitaly/service/operations/tags.go b/internal/gitaly/service/operations/tags.go index 68acb036a..678e1d68c 100644 --- a/internal/gitaly/service/operations/tags.go +++ b/internal/gitaly/service/operations/tags.go @@ -178,15 +178,17 @@ func (s *Server) createTag( committer *gitalypb.User, committerTime time.Time, ) (*gitalypb.Tag, git.ObjectID, error) { - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return nil, "", status.Error(codes.Internal, err.Error()) } + defer cancel() - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo) if err != nil { return nil, "", status.Error(codes.Internal, err.Error()) } + defer cancel() // We allow all ways to name a revision that cat-file // supports, not just OID. Resolve it. diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go index 4bfb2d0e9..1bf51fde9 100644 --- a/internal/gitaly/service/ref/find_all_tags.go +++ b/internal/gitaly/service/ref/find_all_tags.go @@ -40,10 +40,11 @@ func (s *server) FindAllTags(in *gitalypb.FindAllTagsRequest, stream gitalypb.Re } func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortField string, stream gitalypb.RefService_FindAllTagsServer, opts *paginationOpts) error { - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return fmt.Errorf("error creating object reader: %v", err) } + defer cancel() forEachRefIter := gitpipe.ForEachRef( ctx, diff --git a/internal/gitaly/service/ref/find_all_tags_test.go b/internal/gitaly/service/ref/find_all_tags_test.go index 487790b7d..b562d74fe 100644 --- a/internal/gitaly/service/ref/find_all_tags_test.go +++ b/internal/gitaly/service/ref/find_all_tags_test.go @@ -403,11 +403,13 @@ func TestFindAllTags_nestedTags(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectReader, err := catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) + defer cancel() - objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) + defer cancel() info, err := objectInfoReader.Info(ctx, git.Revision(tc.originalOid)) require.NoError(t, err) diff --git a/internal/gitaly/service/ref/refs.go b/internal/gitaly/service/ref/refs.go index 63df4f665..76cebb1b8 100644 --- a/internal/gitaly/service/ref/refs.go +++ b/internal/gitaly/service/ref/refs.go @@ -109,10 +109,11 @@ func (s *server) findLocalBranches(in *gitalypb.FindLocalBranchesRequest, stream ctx := stream.Context() repo := s.localrepo(in.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() writer := newFindLocalBranchesWriter(stream, objectReader) opts := buildFindRefsOpts(ctx, in.GetPaginationParams()) @@ -161,10 +162,11 @@ func (s *server) findAllBranches(in *gitalypb.FindAllBranchesRequest, stream git } ctx := stream.Context() - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() opts := buildFindRefsOpts(ctx, nil) opts.cmdArgs = args @@ -240,10 +242,11 @@ func (s *server) findTag(ctx context.Context, repo git.RepositoryExecutor, tagNa return nil, fmt.Errorf("for-each-ref error: %v", err) } - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return nil, err } + defer cancel() var tag *gitalypb.Tag diff --git a/internal/gitaly/service/ref/refs_test.go b/internal/gitaly/service/ref/refs_test.go index 7278f7767..bac8b3e81 100644 --- a/internal/gitaly/service/ref/refs_test.go +++ b/internal/gitaly/service/ref/refs_test.go @@ -1129,11 +1129,13 @@ func TestFindTagNestedTag(t *testing.T) { catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() - objectReader, err := catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) + defer cancel() - objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) + defer cancel() info, err := objectInfoReader.Info(ctx, git.Revision(tc.originalOid)) require.NoError(t, err) diff --git a/internal/gitaly/service/ref/remote_branches.go b/internal/gitaly/service/ref/remote_branches.go index 57ed60999..343950e87 100644 --- a/internal/gitaly/service/ref/remote_branches.go +++ b/internal/gitaly/service/ref/remote_branches.go @@ -31,10 +31,11 @@ func (s *server) findAllRemoteBranches(req *gitalypb.FindAllRemoteBranchesReques patterns := []string{"refs/remotes/" + req.GetRemoteName()} ctx := stream.Context() - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() opts := buildFindRefsOpts(ctx, nil) opts.cmdArgs = args diff --git a/internal/gitaly/service/ref/tag_messages.go b/internal/gitaly/service/ref/tag_messages.go index 6f885eded..895a5098b 100644 --- a/internal/gitaly/service/ref/tag_messages.go +++ b/internal/gitaly/service/ref/tag_messages.go @@ -37,10 +37,11 @@ func (s *server) getAndStreamTagMessages(request *gitalypb.GetTagMessagesRequest ctx := stream.Context() repo := s.localrepo(request.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() for _, tagID := range request.GetTagIds() { tag, err := catfile.GetTag(ctx, objectReader, git.Revision(tagID), "") diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go index 8bdf78479..7b05640d2 100644 --- a/internal/gitaly/service/ref/tag_signatures.go +++ b/internal/gitaly/service/ref/tag_signatures.go @@ -39,10 +39,11 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream ctx := stream.Context() repo := s.localrepo(req.GetRepository()) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternalf("creating object reader: %w", err) } + defer cancel() chunker := chunk.New(&tagSignatureSender{ send: func(signatures []*gitalypb.GetTagSignaturesResponse_TagSignature) error { diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go index 5167176e7..f2ec2e422 100644 --- a/internal/gitaly/service/repository/apply_gitattributes.go +++ b/internal/gitaly/service/repository/apply_gitattributes.go @@ -132,10 +132,11 @@ func (s *server) ApplyGitattributes(ctx context.Context, in *gitalypb.ApplyGitat return nil, helper.ErrInvalidArgumentf("revision: %v", err) } - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return nil, err } + defer cancel() if err := s.applyGitattributes(ctx, repo, objectReader, repoPath, in.GetRevision()); err != nil { return nil, err diff --git a/internal/gitaly/service/repository/archive.go b/internal/gitaly/service/repository/archive.go index 878572f91..6c37e1b2c 100644 --- a/internal/gitaly/service/repository/archive.go +++ b/internal/gitaly/service/repository/archive.go @@ -147,15 +147,17 @@ func (s *server) validateGetArchivePrecondition( path string, exclude []string, ) error { - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return err } + defer cancel() - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo) if err != nil { return err } + defer cancel() f := catfile.NewTreeEntryFinder(objectReader, objectInfoReader) if path != "." { diff --git a/internal/gitaly/service/repository/gc.go b/internal/gitaly/service/repository/gc.go index 6e9d3533f..09117f516 100644 --- a/internal/gitaly/service/repository/gc.go +++ b/internal/gitaly/service/repository/gc.go @@ -96,10 +96,11 @@ func (s *server) cleanupKeepArounds(ctx context.Context, repo *localrepo.Repo) e return nil } - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo) if err != nil { return nil } + defer cancel() keepAroundsPrefix := "refs/keep-around" keepAroundsPath := filepath.Join(repoPath, keepAroundsPrefix) diff --git a/internal/gitaly/service/repository/raw_changes.go b/internal/gitaly/service/repository/raw_changes.go index fb1c2c5a1..5c3184b88 100644 --- a/internal/gitaly/service/repository/raw_changes.go +++ b/internal/gitaly/service/repository/raw_changes.go @@ -20,10 +20,11 @@ func (s *server) GetRawChanges(req *gitalypb.GetRawChangesRequest, stream gitaly ctx := stream.Context() repo := s.localrepo(req.GetRepository()) - objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo) + objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo) if err != nil { return helper.ErrInternal(err) } + defer cancel() if err := validateRawChangesRequest(ctx, req, objectInfoReader); err != nil { return helper.ErrInvalidArgument(err) |