Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-04-26 13:15:48 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-04-29 13:56:46 +0300
commitfa62277145bce6832f98181d4c3d660ccfa570d1 (patch)
tree8f9b318dbd46bad16ed2e30e9a5b7b6023f08b23
parent886b83ee7ce2b47c6b215d7940dd364749fbae01 (diff)
catfile: Make process cancellation synchronous to fix test race
The catfile cache is using Goroutines to clean up processes. This is not deterministic and thus frequently causes our tests to flake when we lose one of the races caused by this. Furthermore, the asynchronicity makes the code quite hard to understand. Refactor the code to use synchronous cancellation. Instead of waiting for the context to get cancelled, it is now the caller that is responsible for killing the process. The caller transparently either gets a cancellation function that - kills the process directly in case it is an uncachable process. - or returns the process to the cache in case it is a cachable process. This significantly simplifies the code and also makes it deterministic, which fixes the flaky tests we have observed. Furthermore, this also allows us to get rid of one more Goroutine per cached catfile process. Note that this allows us to get rid of the signalling logic we had in place previous to this change: there is no need to wait for processes to return to the cache in our tests anymore because this is now guaranteed after the cancellation function was called.
-rw-r--r--internal/git/catfile/cache.go94
-rw-r--r--internal/git/catfile/cache_test.go112
-rw-r--r--internal/git/catfile/testhelper_test.go3
-rw-r--r--internal/git/gitpipe/catfile_info_test.go3
-rw-r--r--internal/git/gitpipe/catfile_object_test.go3
-rw-r--r--internal/git/gitpipe/pipeline_test.go24
-rw-r--r--internal/git/gittest/commit_test.go3
-rw-r--r--internal/git/localrepo/objects.go3
-rw-r--r--internal/git/log/parser.go8
-rw-r--r--internal/gitaly/service/blob/blobs.go6
-rw-r--r--internal/gitaly/service/blob/get_blob.go3
-rw-r--r--internal/gitaly/service/blob/get_blobs.go6
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go13
-rw-r--r--internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go3
-rw-r--r--internal/gitaly/service/cleanup/notifier/notifier.go8
-rw-r--r--internal/gitaly/service/commit/check_objects_exist.go3
-rw-r--r--internal/gitaly/service/commit/commit_messages.go3
-rw-r--r--internal/gitaly/service/commit/commit_signatures.go3
-rw-r--r--internal/gitaly/service/commit/commits_helper.go3
-rw-r--r--internal/gitaly/service/commit/filter_shas_with_signatures.go3
-rw-r--r--internal/gitaly/service/commit/find_commits.go3
-rw-r--r--internal/gitaly/service/commit/last_commit_for_path.go3
-rw-r--r--internal/gitaly/service/commit/list_all_commits.go3
-rw-r--r--internal/gitaly/service/commit/list_commits.go3
-rw-r--r--internal/gitaly/service/commit/list_commits_by_oid.go3
-rw-r--r--internal/gitaly/service/commit/list_commits_by_ref_name.go3
-rw-r--r--internal/gitaly/service/commit/list_last_commits_for_tree.go3
-rw-r--r--internal/gitaly/service/commit/tree_entries.go7
-rw-r--r--internal/gitaly/service/commit/tree_entry.go6
-rw-r--r--internal/gitaly/service/operations/tags.go6
-rw-r--r--internal/gitaly/service/ref/find_all_tags.go3
-rw-r--r--internal/gitaly/service/ref/find_all_tags_test.go6
-rw-r--r--internal/gitaly/service/ref/refs.go9
-rw-r--r--internal/gitaly/service/ref/refs_test.go6
-rw-r--r--internal/gitaly/service/ref/remote_branches.go3
-rw-r--r--internal/gitaly/service/ref/tag_messages.go3
-rw-r--r--internal/gitaly/service/ref/tag_signatures.go3
-rw-r--r--internal/gitaly/service/repository/apply_gitattributes.go3
-rw-r--r--internal/gitaly/service/repository/archive.go6
-rw-r--r--internal/gitaly/service/repository/gc.go3
-rw-r--r--internal/gitaly/service/repository/raw_changes.go3
41 files changed, 190 insertions, 205 deletions
diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go
index 94f163d0f..78c183407 100644
--- a/internal/git/catfile/cache.go
+++ b/internal/git/catfile/cache.go
@@ -14,7 +14,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
- "gitlab.com/gitlab-org/labkit/correlation"
)
const (
@@ -34,10 +33,10 @@ const (
type Cache interface {
// ObjectReader either creates a new object reader or returns a cached one for the given
// repository.
- ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, error)
+ ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, func(), error)
// ObjectInfoReader either creates a new object info reader or returns a cached one for the
// given repository.
- ObjectInfoReader(context.Context, git.RepositoryExecutor) (ObjectInfoReader, error)
+ ObjectInfoReader(context.Context, git.RepositoryExecutor) (ObjectInfoReader, func(), error)
// Evict evicts all cached processes from the cache.
Evict()
}
@@ -67,11 +66,6 @@ type ProcessCache struct {
totalCatfileProcesses prometheus.Counter
catfileLookupCounter *prometheus.CounterVec
catfileCacheMembers *prometheus.GaugeVec
-
- // cachedProcessDone is a condition that gets signalled whenever a process is being
- // considered to be returned to the cache. This field is optional and must only be used in
- // tests.
- cachedProcessDone *sync.Cond
}
// NewCache creates a new catfile process cache.
@@ -175,37 +169,37 @@ func (c *ProcessCache) Stop() {
}
// ObjectReader creates a new ObjectReader process for the given repository.
-func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, error) {
- cacheable, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) {
+func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, func(), error) {
+ cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) {
return newObjectReader(ctx, repo, c.catfileLookupCounter)
}, "catfile.ObjectReader")
if err != nil {
- return nil, err
+ return nil, nil, err
}
objectReader, ok := cacheable.(ObjectReader)
if !ok {
- return nil, fmt.Errorf("expected object reader, got %T", cacheable)
+ return nil, nil, fmt.Errorf("expected object reader, got %T", cacheable)
}
- return objectReader, nil
+ return objectReader, cancel, nil
}
// ObjectInfoReader creates a new ObjectInfoReader process for the given repository.
-func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectInfoReader, error) {
- cacheable, err := c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) {
+func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectInfoReader, func(), error) {
+ cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) {
return newObjectInfoReader(ctx, repo, c.catfileLookupCounter)
}, "catfile.ObjectInfoReader")
if err != nil {
- return nil, err
+ return nil, nil, err
}
objectInfoReader, ok := cacheable.(ObjectInfoReader)
if !ok {
- return nil, fmt.Errorf("expected object info reader, got %T", cacheable)
+ return nil, nil, fmt.Errorf("expected object info reader, got %T", cacheable)
}
- return objectInfoReader, nil
+ return objectInfoReader, cancel, nil
}
func (c *ProcessCache) getOrCreateProcess(
@@ -214,15 +208,9 @@ func (c *ProcessCache) getOrCreateProcess(
processes *processes,
create func(context.Context) (cacheable, error),
spanName string,
-) (_ cacheable, returnedErr error) {
- requestDone := ctx.Done()
- if requestDone == nil {
- panic("empty ctx.Done() when creating catfile process")
- }
-
+) (_ cacheable, _ func(), returnedErr error) {
defer c.reportCacheMembers()
- var cancel func()
cacheKey, isCacheable := newCacheKey(metadata.GetValue(ctx, SessionIDField), repo)
if isCacheable {
// We only try to look up cached processes in case it is cacheable, which requires a
@@ -232,25 +220,18 @@ func (c *ProcessCache) getOrCreateProcess(
// poison the cache with broken git-cat-file(1) processes.
if entry, ok := processes.Checkout(cacheKey); ok {
- go c.returnWhenDone(requestDone, processes, cacheKey, entry.value, entry.cancel)
c.catfileCacheCounter.WithLabelValues("hit").Inc()
- return entry.value, nil
+ return entry.value, func() {
+ c.returnToCache(processes, cacheKey, entry.value, entry.cancel)
+ }, nil
}
c.catfileCacheCounter.WithLabelValues("miss").Inc()
// We have not found any cached process, so we need to create a new one. In this
// case, we need to detach the process from the current context such that it does
- // not get killed when the current context is done. Note that while we explicitly
- // `close()` processes in case this function fails, we must have a cancellable
- // context or otherwise our `command` package will panic.
- ctx, cancel = context.WithCancel(context.Background())
- defer func() {
- if returnedErr != nil {
- cancel()
- }
- }()
-
+ // not get killed when the parent context is cancelled.
+ ctx = context.Background()
// We have to decorrelate the process from the current context given that it
// may potentially be reused across different RPC calls.
ctx = correlation.ContextWithCorrelation(ctx, "")
@@ -259,9 +240,20 @@ func (c *ProcessCache) getOrCreateProcess(
span, ctx := opentracing.StartSpanFromContext(ctx, spanName)
+ // Create a new cancellable process context such that we can kill it on demand. If it's a
+ // cached process, then it will only be killed when the cache evicts the entry because we
+ // detached the background further up. If it's an uncached value, we either kill it manually
+ // or via the RPC context's cancellation function.
+ ctx, cancelProcessContext := context.WithCancel(ctx)
+ defer func() {
+ if returnedErr != nil {
+ cancelProcessContext()
+ }
+ }()
+
process, err := create(ctx)
if err != nil {
- return nil, err
+ return nil, nil, err
}
defer func() {
// If we somehow fail after creating a new process, then we want to kill spawned
@@ -273,20 +265,27 @@ func (c *ProcessCache) getOrCreateProcess(
c.totalCatfileProcesses.Inc()
c.currentCatfileProcesses.Inc()
- go func() {
- <-ctx.Done()
+
+ // Note that we must make sure that `cancel` and `closeProcess` are two different variables.
+ // Otherwise if we passed `cancel` to `returnToCache` and then set `cancel` itself to the
+ // function that calls it we would accidentally call ourselves and end up with a segfault.
+ closeProcess := func() {
+ cancelProcessContext()
process.close()
span.Finish()
c.currentCatfileProcesses.Dec()
- }()
+ }
+ cancel := closeProcess
if isCacheable {
// If the process is cacheable, then we want to put the process into the cache when
// the current outer context is done.
- go c.returnWhenDone(requestDone, processes, cacheKey, process, cancel)
+ cancel = func() {
+ c.returnToCache(processes, cacheKey, process, closeProcess)
+ }
}
- return process, nil
+ return process, cancel, nil
}
func (c *ProcessCache) reportCacheMembers() {
@@ -300,16 +299,9 @@ func (c *ProcessCache) Evict() {
c.objectInfoReaders.Evict()
}
-func (c *ProcessCache) returnWhenDone(done <-chan struct{}, p *processes, cacheKey key, value cacheable, cancel func()) {
- <-done
-
+func (c *ProcessCache) returnToCache(p *processes, cacheKey key, value cacheable, cancel func()) {
defer func() {
c.reportCacheMembers()
- if c.cachedProcessDone != nil {
- c.cachedProcessDone.L.Lock()
- defer c.cachedProcessDone.L.Unlock()
- c.cachedProcessDone.Broadcast()
- }
}()
if value == nil || value.isClosed() {
diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go
index 61728a25a..0f177be4b 100644
--- a/internal/git/catfile/cache_test.go
+++ b/internal/git/catfile/cache_test.go
@@ -5,7 +5,6 @@ import (
"errors"
"io"
"os"
- "sync"
"testing"
"time"
@@ -183,62 +182,36 @@ func TestCache_ObjectReader(t *testing.T) {
cache := newCache(time.Hour, 10, helper.NewManualTicker())
defer cache.Stop()
- cache.cachedProcessDone = sync.NewCond(&sync.Mutex{})
- t.Run("uncancellable", func(t *testing.T) {
- ctx := testhelper.ContextWithoutCancel()
-
- require.PanicsWithValue(t, "empty ctx.Done() when creating catfile process", func() {
- _, _ = cache.ObjectReader(ctx, repoExecutor)
- })
- })
+ ctx := testhelper.Context(t)
t.Run("uncacheable", func(t *testing.T) {
- ctx, cancel := context.WithCancel(testhelper.Context(t))
-
// The context doesn't carry a session ID and is thus uncacheable.
// The process should never get returned to the cache and must be
// killed on context cancellation.
- reader, err := cache.ObjectReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
- objectReaderImpl, ok := reader.(*objectReader)
- require.True(t, ok, "expected object reader")
-
cancel()
- // We're cheating a bit here to avoid creating a racy test by reaching into the
- // process and trying to read from its stdout. If the cancel did kill the process as
- // expected, then the stdout should be closed and we'll get an EOF.
- output, err := io.ReadAll(objectReaderImpl.queue.stdout)
- if err != nil {
- require.True(t, errors.Is(err, os.ErrClosed))
- } else {
- require.NoError(t, err)
- }
- require.Empty(t, output)
-
require.True(t, reader.isClosed())
require.Empty(t, keys(t, &cache.objectReaders))
})
t.Run("cacheable", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = correlation.ContextWithCorrelation(ctx, "1")
+
+ ctx := correlation.ContextWithCorrelation(ctx, "1")
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
// Cancel the context such that the process will be considered for return to the
// cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
keys := keys(t, &cache.objectReaders)
require.Equal(t, []key{{
@@ -254,12 +227,12 @@ func TestCache_ObjectReader(t *testing.T) {
t.Run("dirty process does not get cached", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = testhelper.MergeIncomingMetadata(ctx,
+
+ ctx := testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
// While we request object data, we do not consume it at all. The reader is thus
@@ -267,12 +240,8 @@ func TestCache_ObjectReader(t *testing.T) {
object, err := reader.Object(ctx, "refs/heads/master")
require.NoError(t, err)
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
+ // Cancel the process such that it will be considered for return to the cache.
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, &cache.objectReaders))
@@ -283,24 +252,20 @@ func TestCache_ObjectReader(t *testing.T) {
t.Run("closed process does not get cached", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = testhelper.MergeIncomingMetadata(ctx,
+
+ ctx := testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
// Closed processes naturally cannot be reused anymore and thus shouldn't ever get
// cached.
reader.close()
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
+ // Cancel the process such that it will be considered for return to the cache.
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, &cache.objectReaders))
})
@@ -312,62 +277,35 @@ func TestCache_ObjectInfoReader(t *testing.T) {
cache := newCache(time.Hour, 10, helper.NewManualTicker())
defer cache.Stop()
- cache.cachedProcessDone = sync.NewCond(&sync.Mutex{})
-
- t.Run("uncancellable", func(t *testing.T) {
- ctx := testhelper.ContextWithoutCancel()
- require.PanicsWithValue(t, "empty ctx.Done() when creating catfile process", func() {
- _, _ = cache.ObjectInfoReader(ctx, repoExecutor)
- })
- })
+ ctx := testhelper.Context(t)
t.Run("uncacheable", func(t *testing.T) {
- ctx, cancel := context.WithCancel(testhelper.Context(t))
-
// The context doesn't carry a session ID and is thus uncacheable.
// The process should never get returned to the cache and must be
// killed on context cancellation.
- reader, err := cache.ObjectInfoReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor)
require.NoError(t, err)
- objectInfoReaderImpl, ok := reader.(*objectInfoReader)
- require.True(t, ok, "expected object reader")
-
cancel()
- // We're cheating a bit here to avoid creating a racy test by reaching into the
- // process and trying to read from its stdout. If the cancel did kill the process as
- // expected, then the stdout should be closed and we'll get an EOF.
- output, err := io.ReadAll(objectInfoReaderImpl.queue.stdout)
- if err != nil {
- require.True(t, errors.Is(err, os.ErrClosed))
- } else {
- require.NoError(t, err)
- }
- require.Empty(t, output)
-
require.True(t, reader.isClosed())
require.Empty(t, keys(t, &cache.objectInfoReaders))
})
t.Run("cacheable", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = correlation.ContextWithCorrelation(ctx, "1")
+
+ ctx := correlation.ContextWithCorrelation(ctx, "1")
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectInfoReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor)
require.NoError(t, err)
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
+ // Cancel the process such it will be considered for return to the cache.
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
keys := keys(t, &cache.objectInfoReaders)
require.Equal(t, []key{{
@@ -383,24 +321,20 @@ func TestCache_ObjectInfoReader(t *testing.T) {
t.Run("closed process does not get cached", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = testhelper.MergeIncomingMetadata(ctx,
+
+ ctx := testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectInfoReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor)
require.NoError(t, err)
// Closed processes naturally cannot be reused anymore and thus shouldn't ever get
// cached.
reader.close()
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
+ // Cancel the process such that it will be considered for return to the cache.
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, &cache.objectInfoReaders))
})
diff --git a/internal/git/catfile/testhelper_test.go b/internal/git/catfile/testhelper_test.go
index 9ad072f10..b7e7172cd 100644
--- a/internal/git/catfile/testhelper_test.go
+++ b/internal/git/catfile/testhelper_test.go
@@ -60,8 +60,9 @@ func setupObjectReader(t *testing.T, ctx context.Context) (config.Cfg, ObjectRea
cache := newCache(1*time.Hour, 1000, helper.NewTimerTicker(defaultEvictionInterval))
t.Cleanup(cache.Stop)
- objectReader, err := cache.ObjectReader(ctx, repoExecutor)
+ objectReader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
+ t.Cleanup(cancel)
return cfg, objectReader, repo
}
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 0c774d48d..80a1810d9 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -132,8 +132,9 @@ func TestCatfileInfo(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...)
require.NoError(t, err)
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index fa74d2f06..95a9b3ad6 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -74,8 +74,9 @@ func TestCatfileObject(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs))
require.NoError(t, err)
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index 069bb5584..544ad552d 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -224,11 +224,13 @@ func TestPipeline_revlist(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...)
@@ -280,11 +282,13 @@ func TestPipeline_revlist(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancelReader, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancelReader()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancelReader, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancelReader()
revlistIter := Revlist(ctx, repo, []string{"--all"})
@@ -320,11 +324,13 @@ func TestPipeline_revlist(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
revlistIter := Revlist(ctx, repo, []string{"--all"}, WithObjects())
@@ -372,11 +378,13 @@ func TestPipeline_forEachRef(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
forEachRefIter := ForEachRef(ctx, repo, nil)
diff --git a/internal/git/gittest/commit_test.go b/internal/git/gittest/commit_test.go
index 515455252..3494ad382 100644
--- a/internal/git/gittest/commit_test.go
+++ b/internal/git/gittest/commit_test.go
@@ -22,8 +22,9 @@ func TestWriteCommit(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
defaultCommitter := &gitalypb.CommitAuthor{
Name: []byte("Scrooge McDuck"),
diff --git a/internal/git/localrepo/objects.go b/internal/git/localrepo/objects.go
index e5459d13b..fa37ab29e 100644
--- a/internal/git/localrepo/objects.go
+++ b/internal/git/localrepo/objects.go
@@ -226,10 +226,11 @@ func (repo *Repo) ReadCommit(ctx context.Context, revision git.Revision, opts ..
opt(&cfg)
}
- objectReader, err := repo.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := repo.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, err
}
+ defer cancel()
var commit *gitalypb.GitCommit
if cfg.withTrailers {
diff --git a/internal/git/log/parser.go b/internal/git/log/parser.go
index f176fe140..7b7cf5d37 100644
--- a/internal/git/log/parser.go
+++ b/internal/git/log/parser.go
@@ -23,10 +23,10 @@ type Parser struct {
}
// NewParser returns a new Parser
-func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, src io.Reader) (*Parser, error) {
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, src io.Reader) (*Parser, func(), error) {
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
if err != nil {
- return nil, err
+ return nil, nil, err
}
parser := &Parser{
@@ -34,7 +34,7 @@ func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.Reposit
objectReader: objectReader,
}
- return parser, nil
+ return parser, cancel, nil
}
// Parse parses a single git log line. It returns true if successful, false if it finished
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index 50a8a6274..86acae6a2 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -98,10 +98,11 @@ func (s *server) processBlobs(
// object iterator. We thus support an optional `catfileInfoIter` parameter: if set,
// we just use that one and ignore the object iterator.
if catfileInfoIter == nil {
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
}
+ defer cancel()
catfileInfoIter, err = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter)
if err != nil {
@@ -132,10 +133,11 @@ func (s *server) processBlobs(
return helper.ErrInternal(err)
}
} else {
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter)
if err != nil {
diff --git a/internal/gitaly/service/blob/get_blob.go b/internal/gitaly/service/blob/get_blob.go
index 99ce66873..d7784ea7b 100644
--- a/internal/gitaly/service/blob/get_blob.go
+++ b/internal/gitaly/service/blob/get_blob.go
@@ -20,10 +20,11 @@ func (s *server) GetBlob(in *gitalypb.GetBlobRequest, stream gitalypb.BlobServic
return helper.ErrInvalidArgumentf("GetBlob: %v", err)
}
- objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo)
if err != nil {
return helper.ErrInternalf("GetBlob: %v", err)
}
+ defer cancel()
blob, err := objectReader.Object(ctx, git.Revision(in.Oid))
if err != nil {
diff --git a/internal/gitaly/service/blob/get_blobs.go b/internal/gitaly/service/blob/get_blobs.go
index 5b899ebe7..237c85c17 100644
--- a/internal/gitaly/service/blob/get_blobs.go
+++ b/internal/gitaly/service/blob/get_blobs.go
@@ -159,15 +159,17 @@ func (s *server) GetBlobs(req *gitalypb.GetBlobsRequest, stream gitalypb.BlobSer
repo := s.localrepo(req.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
return sendGetBlobsResponse(req, stream, objectReader, objectInfoReader)
}
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index ca05b4750..57027d4c8 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -52,10 +52,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(),
gitpipe.WithObjects(),
@@ -94,10 +95,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
},
})
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo,
gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
@@ -137,15 +139,17 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
},
})
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
}
+ defer cancel()
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
blobs := make([]gitpipe.RevisionResult, len(req.GetBlobIds()))
for i, blobID := range req.GetBlobIds() {
@@ -160,6 +164,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
if err != nil {
return helper.ErrInternalf("creating object info iterator: %w", err)
}
+ defer cancel()
catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
if err != nil {
diff --git a/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go b/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go
index d10c483d1..bcd015bab 100644
--- a/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go
+++ b/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go
@@ -40,10 +40,11 @@ func (s *server) ApplyBfgObjectMapStream(server gitalypb.CleanupService_ApplyBfg
reader := &bfgStreamReader{firstRequest: firstRequest, server: server}
chunker := chunk.New(&bfgStreamWriter{server: server})
- notifier, err := notifier.New(ctx, s.catfileCache, repo, chunker)
+ notifier, cancel, err := notifier.New(ctx, s.catfileCache, repo, chunker)
if err != nil {
return helper.ErrInternal(err)
}
+ defer cancel()
// It doesn't matter if new internal references are added after this RPC
// starts running - they shouldn't point to the objects removed by the BFG
diff --git a/internal/gitaly/service/cleanup/notifier/notifier.go b/internal/gitaly/service/cleanup/notifier/notifier.go
index 6b6acff88..140b29eae 100644
--- a/internal/gitaly/service/cleanup/notifier/notifier.go
+++ b/internal/gitaly/service/cleanup/notifier/notifier.go
@@ -17,13 +17,13 @@ type Notifier struct {
}
// New instantiates a new Notifier
-func New(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, chunker *chunk.Chunker) (*Notifier, error) {
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+func New(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, chunker *chunk.Chunker) (*Notifier, func(), error) {
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
- return nil, err
+ return nil, nil, err
}
- return &Notifier{objectInfoReader: objectInfoReader, chunker: chunker}, nil
+ return &Notifier{objectInfoReader: objectInfoReader, chunker: chunker}, cancel, nil
}
// Notify builds a new message and sends it to the chunker
diff --git a/internal/gitaly/service/commit/check_objects_exist.go b/internal/gitaly/service/commit/check_objects_exist.go
index baab66f51..83512b340 100644
--- a/internal/gitaly/service/commit/check_objects_exist.go
+++ b/internal/gitaly/service/commit/check_objects_exist.go
@@ -26,13 +26,14 @@ func (s *server) CheckObjectsExist(
return err
}
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(
ctx,
s.localrepo(request.GetRepository()),
)
if err != nil {
return err
}
+ defer cancel()
chunker := chunk.New(&checkObjectsExistSender{stream: stream})
for {
diff --git a/internal/gitaly/service/commit/commit_messages.go b/internal/gitaly/service/commit/commit_messages.go
index 25b334e9e..0e9424f52 100644
--- a/internal/gitaly/service/commit/commit_messages.go
+++ b/internal/gitaly/service/commit/commit_messages.go
@@ -27,10 +27,11 @@ func (s *server) getAndStreamCommitMessages(request *gitalypb.GetCommitMessagesR
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
for _, commitID := range request.GetCommitIds() {
msg, err := catfile.GetCommitMessage(ctx, objectReader, repo, git.Revision(commitID))
diff --git a/internal/gitaly/service/commit/commit_signatures.go b/internal/gitaly/service/commit/commit_signatures.go
index f2e3a7ac4..91ea5bf7c 100644
--- a/internal/gitaly/service/commit/commit_signatures.go
+++ b/internal/gitaly/service/commit/commit_signatures.go
@@ -30,10 +30,11 @@ func (s *server) getCommitSignatures(request *gitalypb.GetCommitSignaturesReques
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(err)
}
+ defer cancel()
for _, commitID := range request.CommitIds {
commitObj, err := objectReader.Object(ctx, git.Revision(commitID)+"^{commit}")
diff --git a/internal/gitaly/service/commit/commits_helper.go b/internal/gitaly/service/commit/commits_helper.go
index cb5715992..2287d9674 100644
--- a/internal/gitaly/service/commit/commits_helper.go
+++ b/internal/gitaly/service/commit/commits_helper.go
@@ -29,10 +29,11 @@ func (s *server) sendCommits(
return err
}
- logParser, err := log.NewParser(ctx, s.catfileCache, repo, cmd)
+ logParser, cancel, err := log.NewParser(ctx, s.catfileCache, repo, cmd)
if err != nil {
return err
}
+ defer cancel()
chunker := chunk.New(sender)
for logParser.Parse(ctx) {
diff --git a/internal/gitaly/service/commit/filter_shas_with_signatures.go b/internal/gitaly/service/commit/filter_shas_with_signatures.go
index 09d22d881..3caa401fe 100644
--- a/internal/gitaly/service/commit/filter_shas_with_signatures.go
+++ b/internal/gitaly/service/commit/filter_shas_with_signatures.go
@@ -38,10 +38,11 @@ func (s *server) filterShasWithSignatures(bidi gitalypb.CommitService_FilterShas
ctx := bidi.Context()
repo := s.localrepo(firstRequest.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
request := firstRequest
for {
diff --git a/internal/gitaly/service/commit/find_commits.go b/internal/gitaly/service/commit/find_commits.go
index 8d6aaf048..12c737b52 100644
--- a/internal/gitaly/service/commit/find_commits.go
+++ b/internal/gitaly/service/commit/find_commits.go
@@ -58,10 +58,11 @@ func (s *server) findCommits(ctx context.Context, req *gitalypb.FindCommitsReque
return fmt.Errorf("error when creating git log command: %v", err)
}
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return fmt.Errorf("creating catfile: %v", err)
}
+ defer cancel()
getCommits := NewGetCommits(logCmd, objectReader)
diff --git a/internal/gitaly/service/commit/last_commit_for_path.go b/internal/gitaly/service/commit/last_commit_for_path.go
index 9b8468c92..a95bfdcf3 100644
--- a/internal/gitaly/service/commit/last_commit_for_path.go
+++ b/internal/gitaly/service/commit/last_commit_for_path.go
@@ -29,10 +29,11 @@ func (s *server) lastCommitForPath(ctx context.Context, in *gitalypb.LastCommitF
}
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, err
}
+ defer cancel()
options := in.GetGlobalOptions()
diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go
index d5fa18622..1709f5cf6 100644
--- a/internal/gitaly/service/commit/list_all_commits.go
+++ b/internal/gitaly/service/commit/list_all_commits.go
@@ -30,10 +30,11 @@ func (s *server) ListAllCommits(
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternalf("creating object reader: %w", err)
}
+ defer cancel()
// If we've got a pagination token, then we will only start to print commits as soon as
// we've seen the token.
diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go
index 826edb9ee..830e1bf46 100644
--- a/internal/gitaly/service/commit/list_commits.go
+++ b/internal/gitaly/service/commit/list_commits.go
@@ -39,10 +39,11 @@ func (s *server) ListCommits(
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
revlistOptions := []gitpipe.RevlistOption{}
diff --git a/internal/gitaly/service/commit/list_commits_by_oid.go b/internal/gitaly/service/commit/list_commits_by_oid.go
index 7ef89f58a..4c736b2d8 100644
--- a/internal/gitaly/service/commit/list_commits_by_oid.go
+++ b/internal/gitaly/service/commit/list_commits_by_oid.go
@@ -25,10 +25,11 @@ func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream g
ctx := stream.Context()
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
sender := chunk.New(&commitsByOidSender{stream: stream})
listCommitsbyOidHistogram.Observe(float64(len(in.Oid)))
diff --git a/internal/gitaly/service/commit/list_commits_by_ref_name.go b/internal/gitaly/service/commit/list_commits_by_ref_name.go
index f6d84e1cd..5b0784f77 100644
--- a/internal/gitaly/service/commit/list_commits_by_ref_name.go
+++ b/internal/gitaly/service/commit/list_commits_by_ref_name.go
@@ -13,10 +13,11 @@ func (s *server) ListCommitsByRefName(in *gitalypb.ListCommitsByRefNameRequest,
ctx := stream.Context()
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(err)
}
+ defer cancel()
sender := chunk.New(&commitsByRefNameSender{stream: stream})
diff --git a/internal/gitaly/service/commit/list_last_commits_for_tree.go b/internal/gitaly/service/commit/list_last_commits_for_tree.go
index 99c3ea0a9..9d0a7a64e 100644
--- a/internal/gitaly/service/commit/list_last_commits_for_tree.go
+++ b/internal/gitaly/service/commit/list_last_commits_for_tree.go
@@ -35,10 +35,11 @@ func (s *server) listLastCommitsForTree(in *gitalypb.ListLastCommitsForTreeReque
ctx := stream.Context()
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
batch := make([]*gitalypb.ListLastCommitsForTreeResponse_CommitForTree, 0, maxNumStatBatchSize)
entries, err := getLSTreeEntries(parser)
diff --git a/internal/gitaly/service/commit/tree_entries.go b/internal/gitaly/service/commit/tree_entries.go
index 96bf79693..ab252eecf 100644
--- a/internal/gitaly/service/commit/tree_entries.go
+++ b/internal/gitaly/service/commit/tree_entries.go
@@ -147,16 +147,19 @@ func (s *server) sendTreeEntries(
}
} else {
var err error
+ var cancel func()
- objectReader, err = s.catfileCache.ObjectReader(stream.Context(), repo)
+ objectReader, cancel, err = s.catfileCache.ObjectReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
- objectInfoReader, err = s.catfileCache.ObjectInfoReader(stream.Context(), repo)
+ objectInfoReader, cancel, err = s.catfileCache.ObjectInfoReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
entries, err = catfile.TreeEntries(ctx, objectReader, objectInfoReader, revision, path)
if err != nil {
diff --git a/internal/gitaly/service/commit/tree_entry.go b/internal/gitaly/service/commit/tree_entry.go
index 2573fe3c0..9203931c4 100644
--- a/internal/gitaly/service/commit/tree_entry.go
+++ b/internal/gitaly/service/commit/tree_entry.go
@@ -132,15 +132,17 @@ func (s *server) TreeEntry(in *gitalypb.TreeEntryRequest, stream gitalypb.Commit
requestPath = strings.TrimRight(requestPath, "/")
}
- objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
return sendTreeEntry(stream, objectReader, objectInfoReader, string(in.GetRevision()), requestPath, in.GetLimit(), in.GetMaxSize())
}
diff --git a/internal/gitaly/service/operations/tags.go b/internal/gitaly/service/operations/tags.go
index 68acb036a..678e1d68c 100644
--- a/internal/gitaly/service/operations/tags.go
+++ b/internal/gitaly/service/operations/tags.go
@@ -178,15 +178,17 @@ func (s *Server) createTag(
committer *gitalypb.User,
committerTime time.Time,
) (*gitalypb.Tag, git.ObjectID, error) {
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, "", status.Error(codes.Internal, err.Error())
}
+ defer cancel()
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return nil, "", status.Error(codes.Internal, err.Error())
}
+ defer cancel()
// We allow all ways to name a revision that cat-file
// supports, not just OID. Resolve it.
diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go
index 4bfb2d0e9..1bf51fde9 100644
--- a/internal/gitaly/service/ref/find_all_tags.go
+++ b/internal/gitaly/service/ref/find_all_tags.go
@@ -40,10 +40,11 @@ func (s *server) FindAllTags(in *gitalypb.FindAllTagsRequest, stream gitalypb.Re
}
func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortField string, stream gitalypb.RefService_FindAllTagsServer, opts *paginationOpts) error {
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return fmt.Errorf("error creating object reader: %v", err)
}
+ defer cancel()
forEachRefIter := gitpipe.ForEachRef(
ctx,
diff --git a/internal/gitaly/service/ref/find_all_tags_test.go b/internal/gitaly/service/ref/find_all_tags_test.go
index 487790b7d..b562d74fe 100644
--- a/internal/gitaly/service/ref/find_all_tags_test.go
+++ b/internal/gitaly/service/ref/find_all_tags_test.go
@@ -403,11 +403,13 @@ func TestFindAllTags_nestedTags(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
info, err := objectInfoReader.Info(ctx, git.Revision(tc.originalOid))
require.NoError(t, err)
diff --git a/internal/gitaly/service/ref/refs.go b/internal/gitaly/service/ref/refs.go
index 63df4f665..76cebb1b8 100644
--- a/internal/gitaly/service/ref/refs.go
+++ b/internal/gitaly/service/ref/refs.go
@@ -109,10 +109,11 @@ func (s *server) findLocalBranches(in *gitalypb.FindLocalBranchesRequest, stream
ctx := stream.Context()
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
writer := newFindLocalBranchesWriter(stream, objectReader)
opts := buildFindRefsOpts(ctx, in.GetPaginationParams())
@@ -161,10 +162,11 @@ func (s *server) findAllBranches(in *gitalypb.FindAllBranchesRequest, stream git
}
ctx := stream.Context()
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
opts := buildFindRefsOpts(ctx, nil)
opts.cmdArgs = args
@@ -240,10 +242,11 @@ func (s *server) findTag(ctx context.Context, repo git.RepositoryExecutor, tagNa
return nil, fmt.Errorf("for-each-ref error: %v", err)
}
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, err
}
+ defer cancel()
var tag *gitalypb.Tag
diff --git a/internal/gitaly/service/ref/refs_test.go b/internal/gitaly/service/ref/refs_test.go
index 7278f7767..bac8b3e81 100644
--- a/internal/gitaly/service/ref/refs_test.go
+++ b/internal/gitaly/service/ref/refs_test.go
@@ -1129,11 +1129,13 @@ func TestFindTagNestedTag(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
info, err := objectInfoReader.Info(ctx, git.Revision(tc.originalOid))
require.NoError(t, err)
diff --git a/internal/gitaly/service/ref/remote_branches.go b/internal/gitaly/service/ref/remote_branches.go
index 57ed60999..343950e87 100644
--- a/internal/gitaly/service/ref/remote_branches.go
+++ b/internal/gitaly/service/ref/remote_branches.go
@@ -31,10 +31,11 @@ func (s *server) findAllRemoteBranches(req *gitalypb.FindAllRemoteBranchesReques
patterns := []string{"refs/remotes/" + req.GetRemoteName()}
ctx := stream.Context()
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
opts := buildFindRefsOpts(ctx, nil)
opts.cmdArgs = args
diff --git a/internal/gitaly/service/ref/tag_messages.go b/internal/gitaly/service/ref/tag_messages.go
index 6f885eded..895a5098b 100644
--- a/internal/gitaly/service/ref/tag_messages.go
+++ b/internal/gitaly/service/ref/tag_messages.go
@@ -37,10 +37,11 @@ func (s *server) getAndStreamTagMessages(request *gitalypb.GetTagMessagesRequest
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
for _, tagID := range request.GetTagIds() {
tag, err := catfile.GetTag(ctx, objectReader, git.Revision(tagID), "")
diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go
index 8bdf78479..7b05640d2 100644
--- a/internal/gitaly/service/ref/tag_signatures.go
+++ b/internal/gitaly/service/ref/tag_signatures.go
@@ -39,10 +39,11 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream
ctx := stream.Context()
repo := s.localrepo(req.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternalf("creating object reader: %w", err)
}
+ defer cancel()
chunker := chunk.New(&tagSignatureSender{
send: func(signatures []*gitalypb.GetTagSignaturesResponse_TagSignature) error {
diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go
index 5167176e7..f2ec2e422 100644
--- a/internal/gitaly/service/repository/apply_gitattributes.go
+++ b/internal/gitaly/service/repository/apply_gitattributes.go
@@ -132,10 +132,11 @@ func (s *server) ApplyGitattributes(ctx context.Context, in *gitalypb.ApplyGitat
return nil, helper.ErrInvalidArgumentf("revision: %v", err)
}
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, err
}
+ defer cancel()
if err := s.applyGitattributes(ctx, repo, objectReader, repoPath, in.GetRevision()); err != nil {
return nil, err
diff --git a/internal/gitaly/service/repository/archive.go b/internal/gitaly/service/repository/archive.go
index 878572f91..6c37e1b2c 100644
--- a/internal/gitaly/service/repository/archive.go
+++ b/internal/gitaly/service/repository/archive.go
@@ -147,15 +147,17 @@ func (s *server) validateGetArchivePrecondition(
path string,
exclude []string,
) error {
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
f := catfile.NewTreeEntryFinder(objectReader, objectInfoReader)
if path != "." {
diff --git a/internal/gitaly/service/repository/gc.go b/internal/gitaly/service/repository/gc.go
index 6e9d3533f..09117f516 100644
--- a/internal/gitaly/service/repository/gc.go
+++ b/internal/gitaly/service/repository/gc.go
@@ -96,10 +96,11 @@ func (s *server) cleanupKeepArounds(ctx context.Context, repo *localrepo.Repo) e
return nil
}
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return nil
}
+ defer cancel()
keepAroundsPrefix := "refs/keep-around"
keepAroundsPath := filepath.Join(repoPath, keepAroundsPrefix)
diff --git a/internal/gitaly/service/repository/raw_changes.go b/internal/gitaly/service/repository/raw_changes.go
index fb1c2c5a1..5c3184b88 100644
--- a/internal/gitaly/service/repository/raw_changes.go
+++ b/internal/gitaly/service/repository/raw_changes.go
@@ -20,10 +20,11 @@ func (s *server) GetRawChanges(req *gitalypb.GetRawChangesRequest, stream gitaly
ctx := stream.Context()
repo := s.localrepo(req.GetRepository())
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
if err != nil {
return helper.ErrInternal(err)
}
+ defer cancel()
if err := validateRawChangesRequest(ctx, req, objectInfoReader); err != nil {
return helper.ErrInvalidArgument(err)