Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-04-29 17:42:22 +0300
committerJohn Cai <jcai@gitlab.com>2022-04-29 17:42:22 +0300
commitb3a0b630bd33b6f23902203d5443c7518b316b41 (patch)
tree425e3fb7e8cb6318f38ac2923a29a03c05e40085
parent89d1dfc6c5d03e929ba4f419043577be25869d4f (diff)
parent77b8ced5bd60eebd0614e5ab408d048ab1116d43 (diff)
Merge branch 'pks-catfile-fix-flaky-process-termination-test' into 'master'
catfile: Make process cancellation synchronous to fix test race See merge request gitlab-org/gitaly!4500
-rw-r--r--internal/git/catfile/cache.go127
-rw-r--r--internal/git/catfile/cache_test.go112
-rw-r--r--internal/git/catfile/object_info_reader.go9
-rw-r--r--internal/git/catfile/object_reader.go8
-rw-r--r--internal/git/catfile/testhelper_test.go3
-rw-r--r--internal/git/gitpipe/catfile_info_test.go3
-rw-r--r--internal/git/gitpipe/catfile_object_test.go3
-rw-r--r--internal/git/gitpipe/pipeline_test.go24
-rw-r--r--internal/git/gittest/commit_test.go3
-rw-r--r--internal/git/localrepo/objects.go3
-rw-r--r--internal/git/log/parser.go8
-rw-r--r--internal/gitaly/service/blob/blobs.go6
-rw-r--r--internal/gitaly/service/blob/get_blob.go3
-rw-r--r--internal/gitaly/service/blob/get_blobs.go6
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go13
-rw-r--r--internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go3
-rw-r--r--internal/gitaly/service/cleanup/notifier/notifier.go8
-rw-r--r--internal/gitaly/service/commit/check_objects_exist.go3
-rw-r--r--internal/gitaly/service/commit/commit_messages.go3
-rw-r--r--internal/gitaly/service/commit/commit_signatures.go3
-rw-r--r--internal/gitaly/service/commit/commits_helper.go3
-rw-r--r--internal/gitaly/service/commit/filter_shas_with_signatures.go3
-rw-r--r--internal/gitaly/service/commit/find_commits.go3
-rw-r--r--internal/gitaly/service/commit/last_commit_for_path.go3
-rw-r--r--internal/gitaly/service/commit/list_all_commits.go3
-rw-r--r--internal/gitaly/service/commit/list_commits.go3
-rw-r--r--internal/gitaly/service/commit/list_commits_by_oid.go3
-rw-r--r--internal/gitaly/service/commit/list_commits_by_ref_name.go3
-rw-r--r--internal/gitaly/service/commit/list_last_commits_for_tree.go3
-rw-r--r--internal/gitaly/service/commit/tree_entries.go7
-rw-r--r--internal/gitaly/service/commit/tree_entry.go6
-rw-r--r--internal/gitaly/service/operations/tags.go6
-rw-r--r--internal/gitaly/service/ref/find_all_tags.go3
-rw-r--r--internal/gitaly/service/ref/find_all_tags_test.go6
-rw-r--r--internal/gitaly/service/ref/refs.go9
-rw-r--r--internal/gitaly/service/ref/refs_test.go6
-rw-r--r--internal/gitaly/service/ref/remote_branches.go3
-rw-r--r--internal/gitaly/service/ref/tag_messages.go3
-rw-r--r--internal/gitaly/service/ref/tag_signatures.go3
-rw-r--r--internal/gitaly/service/repository/apply_gitattributes.go3
-rw-r--r--internal/gitaly/service/repository/archive.go6
-rw-r--r--internal/gitaly/service/repository/gc.go3
-rw-r--r--internal/gitaly/service/repository/raw_changes.go3
43 files changed, 213 insertions, 232 deletions
diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go
index e86248feb..293ec3e65 100644
--- a/internal/git/catfile/cache.go
+++ b/internal/git/catfile/cache.go
@@ -34,10 +34,10 @@ const (
type Cache interface {
// ObjectReader either creates a new object reader or returns a cached one for the given
// repository.
- ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, error)
+ ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, func(), error)
// ObjectInfoReader either creates a new object info reader or returns a cached one for the
// given repository.
- ObjectInfoReader(context.Context, git.RepositoryExecutor) (ObjectInfoReader, error)
+ ObjectInfoReader(context.Context, git.RepositoryExecutor) (ObjectInfoReader, func(), error)
// Evict evicts all cached processes from the cache.
Evict()
}
@@ -67,11 +67,6 @@ type ProcessCache struct {
totalCatfileProcesses prometheus.Counter
catfileLookupCounter *prometheus.CounterVec
catfileCacheMembers *prometheus.GaugeVec
-
- // cachedProcessDone is a condition that gets signalled whenever a process is being
- // considered to be returned to the cache. This field is optional and must only be used in
- // tests.
- cachedProcessDone *sync.Cond
}
// NewCache creates a new catfile process cache.
@@ -175,37 +170,37 @@ func (c *ProcessCache) Stop() {
}
// ObjectReader creates a new ObjectReader process for the given repository.
-func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, error) {
- cacheable, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) {
+func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, func(), error) {
+ cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) {
return newObjectReader(ctx, repo, c.catfileLookupCounter)
- })
+ }, "catfile.ObjectReader")
if err != nil {
- return nil, err
+ return nil, nil, err
}
objectReader, ok := cacheable.(ObjectReader)
if !ok {
- return nil, fmt.Errorf("expected object reader, got %T", cacheable)
+ return nil, nil, fmt.Errorf("expected object reader, got %T", cacheable)
}
- return objectReader, nil
+ return objectReader, cancel, nil
}
// ObjectInfoReader creates a new ObjectInfoReader process for the given repository.
-func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectInfoReader, error) {
- cacheable, err := c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) {
+func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectInfoReader, func(), error) {
+ cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) {
return newObjectInfoReader(ctx, repo, c.catfileLookupCounter)
- })
+ }, "catfile.ObjectInfoReader")
if err != nil {
- return nil, err
+ return nil, nil, err
}
objectInfoReader, ok := cacheable.(ObjectInfoReader)
if !ok {
- return nil, fmt.Errorf("expected object info reader, got %T", cacheable)
+ return nil, nil, fmt.Errorf("expected object info reader, got %T", cacheable)
}
- return objectInfoReader, nil
+ return objectInfoReader, cancel, nil
}
func (c *ProcessCache) getOrCreateProcess(
@@ -213,75 +208,92 @@ func (c *ProcessCache) getOrCreateProcess(
repo repository.GitRepo,
processes *processes,
create func(context.Context) (cacheable, error),
-) (_ cacheable, returnedErr error) {
- requestDone := ctx.Done()
- if requestDone == nil {
- panic("empty ctx.Done() in catfile.Batch.New()")
- }
-
+ spanName string,
+) (_ cacheable, _ func(), returnedErr error) {
defer c.reportCacheMembers()
- var cancel func()
cacheKey, isCacheable := newCacheKey(metadata.GetValue(ctx, SessionIDField), repo)
if isCacheable {
- // We only try to look up cached batch processes in case it is cacheable, which
- // requires a session ID. This is mostly done such that git-cat-file(1) processes
- // from one user cannot interfer with those from another user. The main intent is to
- // disallow trivial denial of service attacks against other users in case it is
- // possible to poison the cache with broken git-cat-file(1) processes.
+ // We only try to look up cached processes in case it is cacheable, which requires a
+ // session ID. This is mostly done such that git-cat-file(1) processes from one user
+ // cannot interfere with those from another user. The main intent is to disallow
+ // trivial denial of service attacks against other users in case it is possible to
+ // poison the cache with broken git-cat-file(1) processes.
if entry, ok := processes.Checkout(cacheKey); ok {
- go c.returnWhenDone(requestDone, processes, cacheKey, entry.value, entry.cancel)
c.catfileCacheCounter.WithLabelValues("hit").Inc()
- return entry.value, nil
+ return entry.value, func() {
+ c.returnToCache(processes, cacheKey, entry.value, entry.cancel)
+ }, nil
}
c.catfileCacheCounter.WithLabelValues("miss").Inc()
// We have not found any cached process, so we need to create a new one. In this
// case, we need to detach the process from the current context such that it does
- // not get killed when the current context is done. Note that while we explicitly
- // `close()` processes in case this function fails, we must have a cancellable
- // context or otherwise our `command` package will panic.
- ctx, cancel = context.WithCancel(context.Background())
- defer func() {
- if returnedErr != nil {
- cancel()
- }
- }()
-
+ // not get killed when the parent context is cancelled.
+ //
+ // Note that we explicitly retain feature flags here, which means that cached
+ // processes may retain flags for some time which have been changed meanwhile. While
+ // not ideal, it feels better compared to just ignoring feature flags altogether.
+ // The latter would mean that we cannot use flags in the catfile code, but more
+ // importantly we also wouldn't be able to use feature-flagged Git version upgrades
+ // for catfile processes.
+ ctx = helper.SuppressCancellation(ctx)
// We have to decorrelate the process from the current context given that it
// may potentially be reused across different RPC calls.
ctx = correlation.ContextWithCorrelation(ctx, "")
ctx = opentracing.ContextWithSpan(ctx, nil)
}
- batch, err := create(ctx)
+ span, ctx := opentracing.StartSpanFromContext(ctx, spanName)
+
+ // Create a new cancellable process context such that we can kill it on demand. If it's a
+ // cached process, then it will only be killed when the cache evicts the entry because we
+ // detached the background further up. If it's an uncached value, we either kill it manually
+ // or via the RPC context's cancellation function.
+ ctx, cancelProcessContext := context.WithCancel(ctx)
+ defer func() {
+ if returnedErr != nil {
+ cancelProcessContext()
+ }
+ }()
+
+ process, err := create(ctx)
if err != nil {
- return nil, err
+ return nil, nil, err
}
defer func() {
- // If we somehow fail after creating a new Batch process, then we want to kill
- // spawned processes right away.
+ // If we somehow fail after creating a new process, then we want to kill spawned
+ // processes right away.
if returnedErr != nil {
- batch.close()
+ process.close()
}
}()
c.totalCatfileProcesses.Inc()
c.currentCatfileProcesses.Inc()
- go func() {
- <-ctx.Done()
+
+ // Note that we must make sure that `cancel` and `closeProcess` are two different variables.
+ // Otherwise if we passed `cancel` to `returnToCache` and then set `cancel` itself to the
+ // function that calls it we would accidentally call ourselves and end up with a segfault.
+ closeProcess := func() {
+ cancelProcessContext()
+ process.close()
+ span.Finish()
c.currentCatfileProcesses.Dec()
- }()
+ }
+ cancel := closeProcess
if isCacheable {
// If the process is cacheable, then we want to put the process into the cache when
// the current outer context is done.
- go c.returnWhenDone(requestDone, processes, cacheKey, batch, cancel)
+ cancel = func() {
+ c.returnToCache(processes, cacheKey, process, closeProcess)
+ }
}
- return batch, nil
+ return process, cancel, nil
}
func (c *ProcessCache) reportCacheMembers() {
@@ -295,16 +307,9 @@ func (c *ProcessCache) Evict() {
c.objectInfoReaders.Evict()
}
-func (c *ProcessCache) returnWhenDone(done <-chan struct{}, p *processes, cacheKey key, value cacheable, cancel func()) {
- <-done
-
+func (c *ProcessCache) returnToCache(p *processes, cacheKey key, value cacheable, cancel func()) {
defer func() {
c.reportCacheMembers()
- if c.cachedProcessDone != nil {
- c.cachedProcessDone.L.Lock()
- defer c.cachedProcessDone.L.Unlock()
- c.cachedProcessDone.Broadcast()
- }
}()
if value == nil || value.isClosed() {
diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go
index b90124ec8..0f177be4b 100644
--- a/internal/git/catfile/cache_test.go
+++ b/internal/git/catfile/cache_test.go
@@ -5,7 +5,6 @@ import (
"errors"
"io"
"os"
- "sync"
"testing"
"time"
@@ -183,62 +182,36 @@ func TestCache_ObjectReader(t *testing.T) {
cache := newCache(time.Hour, 10, helper.NewManualTicker())
defer cache.Stop()
- cache.cachedProcessDone = sync.NewCond(&sync.Mutex{})
- t.Run("uncancellable", func(t *testing.T) {
- ctx := testhelper.ContextWithoutCancel()
-
- require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() {
- _, _ = cache.ObjectReader(ctx, repoExecutor)
- })
- })
+ ctx := testhelper.Context(t)
t.Run("uncacheable", func(t *testing.T) {
- ctx, cancel := context.WithCancel(testhelper.Context(t))
-
// The context doesn't carry a session ID and is thus uncacheable.
// The process should never get returned to the cache and must be
// killed on context cancellation.
- reader, err := cache.ObjectReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
- objectReaderImpl, ok := reader.(*objectReader)
- require.True(t, ok, "expected object reader")
-
cancel()
- // We're cheating a bit here to avoid creating a racy test by reaching into the
- // process and trying to read from its stdout. If the cancel did kill the process as
- // expected, then the stdout should be closed and we'll get an EOF.
- output, err := io.ReadAll(objectReaderImpl.queue.stdout)
- if err != nil {
- require.True(t, errors.Is(err, os.ErrClosed))
- } else {
- require.NoError(t, err)
- }
- require.Empty(t, output)
-
require.True(t, reader.isClosed())
require.Empty(t, keys(t, &cache.objectReaders))
})
t.Run("cacheable", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = correlation.ContextWithCorrelation(ctx, "1")
+
+ ctx := correlation.ContextWithCorrelation(ctx, "1")
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
// Cancel the context such that the process will be considered for return to the
// cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
keys := keys(t, &cache.objectReaders)
require.Equal(t, []key{{
@@ -254,12 +227,12 @@ func TestCache_ObjectReader(t *testing.T) {
t.Run("dirty process does not get cached", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = testhelper.MergeIncomingMetadata(ctx,
+
+ ctx := testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
// While we request object data, we do not consume it at all. The reader is thus
@@ -267,12 +240,8 @@ func TestCache_ObjectReader(t *testing.T) {
object, err := reader.Object(ctx, "refs/heads/master")
require.NoError(t, err)
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
+ // Cancel the process such that it will be considered for return to the cache.
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, &cache.objectReaders))
@@ -283,24 +252,20 @@ func TestCache_ObjectReader(t *testing.T) {
t.Run("closed process does not get cached", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = testhelper.MergeIncomingMetadata(ctx,
+
+ ctx := testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
// Closed processes naturally cannot be reused anymore and thus shouldn't ever get
// cached.
reader.close()
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
+ // Cancel the process such that it will be considered for return to the cache.
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, &cache.objectReaders))
})
@@ -312,62 +277,35 @@ func TestCache_ObjectInfoReader(t *testing.T) {
cache := newCache(time.Hour, 10, helper.NewManualTicker())
defer cache.Stop()
- cache.cachedProcessDone = sync.NewCond(&sync.Mutex{})
-
- t.Run("uncancellable", func(t *testing.T) {
- ctx := testhelper.ContextWithoutCancel()
- require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() {
- _, _ = cache.ObjectInfoReader(ctx, repoExecutor)
- })
- })
+ ctx := testhelper.Context(t)
t.Run("uncacheable", func(t *testing.T) {
- ctx, cancel := context.WithCancel(testhelper.Context(t))
-
// The context doesn't carry a session ID and is thus uncacheable.
// The process should never get returned to the cache and must be
// killed on context cancellation.
- reader, err := cache.ObjectInfoReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor)
require.NoError(t, err)
- objectInfoReaderImpl, ok := reader.(*objectInfoReader)
- require.True(t, ok, "expected object reader")
-
cancel()
- // We're cheating a bit here to avoid creating a racy test by reaching into the
- // process and trying to read from its stdout. If the cancel did kill the process as
- // expected, then the stdout should be closed and we'll get an EOF.
- output, err := io.ReadAll(objectInfoReaderImpl.queue.stdout)
- if err != nil {
- require.True(t, errors.Is(err, os.ErrClosed))
- } else {
- require.NoError(t, err)
- }
- require.Empty(t, output)
-
require.True(t, reader.isClosed())
require.Empty(t, keys(t, &cache.objectInfoReaders))
})
t.Run("cacheable", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = correlation.ContextWithCorrelation(ctx, "1")
+
+ ctx := correlation.ContextWithCorrelation(ctx, "1")
ctx = testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectInfoReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor)
require.NoError(t, err)
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
+ // Cancel the process such it will be considered for return to the cache.
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
keys := keys(t, &cache.objectInfoReaders)
require.Equal(t, []key{{
@@ -383,24 +321,20 @@ func TestCache_ObjectInfoReader(t *testing.T) {
t.Run("closed process does not get cached", func(t *testing.T) {
defer cache.Evict()
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- ctx = testhelper.MergeIncomingMetadata(ctx,
+
+ ctx := testhelper.MergeIncomingMetadata(ctx,
metadata.Pairs(SessionIDField, "1"),
)
- reader, err := cache.ObjectInfoReader(ctx, repoExecutor)
+ reader, cancel, err := cache.ObjectInfoReader(ctx, repoExecutor)
require.NoError(t, err)
// Closed processes naturally cannot be reused anymore and thus shouldn't ever get
// cached.
reader.close()
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
+ // Cancel the process such that it will be considered for return to the cache.
cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
require.Empty(t, keys(t, &cache.objectInfoReaders))
})
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go
index 300459e81..e7773acd2 100644
--- a/internal/git/catfile/object_info_reader.go
+++ b/internal/git/catfile/object_info_reader.go
@@ -8,7 +8,6 @@ import (
"strings"
"sync/atomic"
- "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
@@ -139,8 +138,6 @@ func newObjectInfoReader(
repo git.RepositoryExecutor,
counter *prometheus.CounterVec,
) (*objectInfoReader, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectInfoReader")
-
batchCmd, err := repo.Exec(ctx,
git.SubCmd{
Name: "cat-file",
@@ -163,12 +160,6 @@ func newObjectInfoReader(
stdin: bufio.NewWriter(batchCmd),
},
}
- go func() {
- <-ctx.Done()
- // This is crucial to prevent leaking file descriptors.
- objectInfoReader.close()
- span.Finish()
- }()
return objectInfoReader, nil
}
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 7020adcbe..8dedb1572 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -8,7 +8,6 @@ import (
"os"
"sync/atomic"
- "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
@@ -61,8 +60,6 @@ func newObjectReader(
repo git.RepositoryExecutor,
counter *prometheus.CounterVec,
) (*objectReader, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectReader")
-
batchCmd, err := repo.Exec(ctx,
git.SubCmd{
Name: "cat-file",
@@ -86,11 +83,6 @@ func newObjectReader(
stdin: bufio.NewWriter(batchCmd),
},
}
- go func() {
- <-ctx.Done()
- objectReader.close()
- span.Finish()
- }()
return objectReader, nil
}
diff --git a/internal/git/catfile/testhelper_test.go b/internal/git/catfile/testhelper_test.go
index 9ad072f10..b7e7172cd 100644
--- a/internal/git/catfile/testhelper_test.go
+++ b/internal/git/catfile/testhelper_test.go
@@ -60,8 +60,9 @@ func setupObjectReader(t *testing.T, ctx context.Context) (config.Cfg, ObjectRea
cache := newCache(1*time.Hour, 1000, helper.NewTimerTicker(defaultEvictionInterval))
t.Cleanup(cache.Stop)
- objectReader, err := cache.ObjectReader(ctx, repoExecutor)
+ objectReader, cancel, err := cache.ObjectReader(ctx, repoExecutor)
require.NoError(t, err)
+ t.Cleanup(cancel)
return cfg, objectReader, repo
}
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 0c774d48d..80a1810d9 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -132,8 +132,9 @@ func TestCatfileInfo(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...)
require.NoError(t, err)
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index fa74d2f06..95a9b3ad6 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -74,8 +74,9 @@ func TestCatfileObject(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs))
require.NoError(t, err)
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index 069bb5584..544ad552d 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -224,11 +224,13 @@ func TestPipeline_revlist(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...)
@@ -280,11 +282,13 @@ func TestPipeline_revlist(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancelReader, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancelReader()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancelReader, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancelReader()
revlistIter := Revlist(ctx, repo, []string{"--all"})
@@ -320,11 +324,13 @@ func TestPipeline_revlist(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
revlistIter := Revlist(ctx, repo, []string{"--all"}, WithObjects())
@@ -372,11 +378,13 @@ func TestPipeline_forEachRef(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
forEachRefIter := ForEachRef(ctx, repo, nil)
diff --git a/internal/git/gittest/commit_test.go b/internal/git/gittest/commit_test.go
index 515455252..3494ad382 100644
--- a/internal/git/gittest/commit_test.go
+++ b/internal/git/gittest/commit_test.go
@@ -22,8 +22,9 @@ func TestWriteCommit(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
defaultCommitter := &gitalypb.CommitAuthor{
Name: []byte("Scrooge McDuck"),
diff --git a/internal/git/localrepo/objects.go b/internal/git/localrepo/objects.go
index e5459d13b..fa37ab29e 100644
--- a/internal/git/localrepo/objects.go
+++ b/internal/git/localrepo/objects.go
@@ -226,10 +226,11 @@ func (repo *Repo) ReadCommit(ctx context.Context, revision git.Revision, opts ..
opt(&cfg)
}
- objectReader, err := repo.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := repo.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, err
}
+ defer cancel()
var commit *gitalypb.GitCommit
if cfg.withTrailers {
diff --git a/internal/git/log/parser.go b/internal/git/log/parser.go
index f176fe140..7b7cf5d37 100644
--- a/internal/git/log/parser.go
+++ b/internal/git/log/parser.go
@@ -23,10 +23,10 @@ type Parser struct {
}
// NewParser returns a new Parser
-func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, src io.Reader) (*Parser, error) {
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, src io.Reader) (*Parser, func(), error) {
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
if err != nil {
- return nil, err
+ return nil, nil, err
}
parser := &Parser{
@@ -34,7 +34,7 @@ func NewParser(ctx context.Context, catfileCache catfile.Cache, repo git.Reposit
objectReader: objectReader,
}
- return parser, nil
+ return parser, cancel, nil
}
// Parse parses a single git log line. It returns true if successful, false if it finished
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index 50a8a6274..86acae6a2 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -98,10 +98,11 @@ func (s *server) processBlobs(
// object iterator. We thus support an optional `catfileInfoIter` parameter: if set,
// we just use that one and ignore the object iterator.
if catfileInfoIter == nil {
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
}
+ defer cancel()
catfileInfoIter, err = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter)
if err != nil {
@@ -132,10 +133,11 @@ func (s *server) processBlobs(
return helper.ErrInternal(err)
}
} else {
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter)
if err != nil {
diff --git a/internal/gitaly/service/blob/get_blob.go b/internal/gitaly/service/blob/get_blob.go
index 99ce66873..d7784ea7b 100644
--- a/internal/gitaly/service/blob/get_blob.go
+++ b/internal/gitaly/service/blob/get_blob.go
@@ -20,10 +20,11 @@ func (s *server) GetBlob(in *gitalypb.GetBlobRequest, stream gitalypb.BlobServic
return helper.ErrInvalidArgumentf("GetBlob: %v", err)
}
- objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo)
if err != nil {
return helper.ErrInternalf("GetBlob: %v", err)
}
+ defer cancel()
blob, err := objectReader.Object(ctx, git.Revision(in.Oid))
if err != nil {
diff --git a/internal/gitaly/service/blob/get_blobs.go b/internal/gitaly/service/blob/get_blobs.go
index 5b899ebe7..237c85c17 100644
--- a/internal/gitaly/service/blob/get_blobs.go
+++ b/internal/gitaly/service/blob/get_blobs.go
@@ -159,15 +159,17 @@ func (s *server) GetBlobs(req *gitalypb.GetBlobsRequest, stream gitalypb.BlobSer
repo := s.localrepo(req.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
return sendGetBlobsResponse(req, stream, objectReader, objectInfoReader)
}
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index ca05b4750..57027d4c8 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -52,10 +52,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(),
gitpipe.WithObjects(),
@@ -94,10 +95,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
},
})
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo,
gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
@@ -137,15 +139,17 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
},
})
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
}
+ defer cancel()
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
blobs := make([]gitpipe.RevisionResult, len(req.GetBlobIds()))
for i, blobID := range req.GetBlobIds() {
@@ -160,6 +164,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
if err != nil {
return helper.ErrInternalf("creating object info iterator: %w", err)
}
+ defer cancel()
catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
if err != nil {
diff --git a/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go b/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go
index d10c483d1..bcd015bab 100644
--- a/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go
+++ b/internal/gitaly/service/cleanup/apply_bfg_object_map_stream.go
@@ -40,10 +40,11 @@ func (s *server) ApplyBfgObjectMapStream(server gitalypb.CleanupService_ApplyBfg
reader := &bfgStreamReader{firstRequest: firstRequest, server: server}
chunker := chunk.New(&bfgStreamWriter{server: server})
- notifier, err := notifier.New(ctx, s.catfileCache, repo, chunker)
+ notifier, cancel, err := notifier.New(ctx, s.catfileCache, repo, chunker)
if err != nil {
return helper.ErrInternal(err)
}
+ defer cancel()
// It doesn't matter if new internal references are added after this RPC
// starts running - they shouldn't point to the objects removed by the BFG
diff --git a/internal/gitaly/service/cleanup/notifier/notifier.go b/internal/gitaly/service/cleanup/notifier/notifier.go
index 6b6acff88..140b29eae 100644
--- a/internal/gitaly/service/cleanup/notifier/notifier.go
+++ b/internal/gitaly/service/cleanup/notifier/notifier.go
@@ -17,13 +17,13 @@ type Notifier struct {
}
// New instantiates a new Notifier
-func New(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, chunker *chunk.Chunker) (*Notifier, error) {
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+func New(ctx context.Context, catfileCache catfile.Cache, repo git.RepositoryExecutor, chunker *chunk.Chunker) (*Notifier, func(), error) {
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
- return nil, err
+ return nil, nil, err
}
- return &Notifier{objectInfoReader: objectInfoReader, chunker: chunker}, nil
+ return &Notifier{objectInfoReader: objectInfoReader, chunker: chunker}, cancel, nil
}
// Notify builds a new message and sends it to the chunker
diff --git a/internal/gitaly/service/commit/check_objects_exist.go b/internal/gitaly/service/commit/check_objects_exist.go
index baab66f51..83512b340 100644
--- a/internal/gitaly/service/commit/check_objects_exist.go
+++ b/internal/gitaly/service/commit/check_objects_exist.go
@@ -26,13 +26,14 @@ func (s *server) CheckObjectsExist(
return err
}
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(
ctx,
s.localrepo(request.GetRepository()),
)
if err != nil {
return err
}
+ defer cancel()
chunker := chunk.New(&checkObjectsExistSender{stream: stream})
for {
diff --git a/internal/gitaly/service/commit/commit_messages.go b/internal/gitaly/service/commit/commit_messages.go
index 25b334e9e..0e9424f52 100644
--- a/internal/gitaly/service/commit/commit_messages.go
+++ b/internal/gitaly/service/commit/commit_messages.go
@@ -27,10 +27,11 @@ func (s *server) getAndStreamCommitMessages(request *gitalypb.GetCommitMessagesR
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
for _, commitID := range request.GetCommitIds() {
msg, err := catfile.GetCommitMessage(ctx, objectReader, repo, git.Revision(commitID))
diff --git a/internal/gitaly/service/commit/commit_signatures.go b/internal/gitaly/service/commit/commit_signatures.go
index f2e3a7ac4..91ea5bf7c 100644
--- a/internal/gitaly/service/commit/commit_signatures.go
+++ b/internal/gitaly/service/commit/commit_signatures.go
@@ -30,10 +30,11 @@ func (s *server) getCommitSignatures(request *gitalypb.GetCommitSignaturesReques
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(err)
}
+ defer cancel()
for _, commitID := range request.CommitIds {
commitObj, err := objectReader.Object(ctx, git.Revision(commitID)+"^{commit}")
diff --git a/internal/gitaly/service/commit/commits_helper.go b/internal/gitaly/service/commit/commits_helper.go
index cb5715992..2287d9674 100644
--- a/internal/gitaly/service/commit/commits_helper.go
+++ b/internal/gitaly/service/commit/commits_helper.go
@@ -29,10 +29,11 @@ func (s *server) sendCommits(
return err
}
- logParser, err := log.NewParser(ctx, s.catfileCache, repo, cmd)
+ logParser, cancel, err := log.NewParser(ctx, s.catfileCache, repo, cmd)
if err != nil {
return err
}
+ defer cancel()
chunker := chunk.New(sender)
for logParser.Parse(ctx) {
diff --git a/internal/gitaly/service/commit/filter_shas_with_signatures.go b/internal/gitaly/service/commit/filter_shas_with_signatures.go
index 09d22d881..3caa401fe 100644
--- a/internal/gitaly/service/commit/filter_shas_with_signatures.go
+++ b/internal/gitaly/service/commit/filter_shas_with_signatures.go
@@ -38,10 +38,11 @@ func (s *server) filterShasWithSignatures(bidi gitalypb.CommitService_FilterShas
ctx := bidi.Context()
repo := s.localrepo(firstRequest.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
request := firstRequest
for {
diff --git a/internal/gitaly/service/commit/find_commits.go b/internal/gitaly/service/commit/find_commits.go
index 8d6aaf048..12c737b52 100644
--- a/internal/gitaly/service/commit/find_commits.go
+++ b/internal/gitaly/service/commit/find_commits.go
@@ -58,10 +58,11 @@ func (s *server) findCommits(ctx context.Context, req *gitalypb.FindCommitsReque
return fmt.Errorf("error when creating git log command: %v", err)
}
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return fmt.Errorf("creating catfile: %v", err)
}
+ defer cancel()
getCommits := NewGetCommits(logCmd, objectReader)
diff --git a/internal/gitaly/service/commit/last_commit_for_path.go b/internal/gitaly/service/commit/last_commit_for_path.go
index 9b8468c92..a95bfdcf3 100644
--- a/internal/gitaly/service/commit/last_commit_for_path.go
+++ b/internal/gitaly/service/commit/last_commit_for_path.go
@@ -29,10 +29,11 @@ func (s *server) lastCommitForPath(ctx context.Context, in *gitalypb.LastCommitF
}
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, err
}
+ defer cancel()
options := in.GetGlobalOptions()
diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go
index d5fa18622..1709f5cf6 100644
--- a/internal/gitaly/service/commit/list_all_commits.go
+++ b/internal/gitaly/service/commit/list_all_commits.go
@@ -30,10 +30,11 @@ func (s *server) ListAllCommits(
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternalf("creating object reader: %w", err)
}
+ defer cancel()
// If we've got a pagination token, then we will only start to print commits as soon as
// we've seen the token.
diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go
index 826edb9ee..830e1bf46 100644
--- a/internal/gitaly/service/commit/list_commits.go
+++ b/internal/gitaly/service/commit/list_commits.go
@@ -39,10 +39,11 @@ func (s *server) ListCommits(
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
+ defer cancel()
revlistOptions := []gitpipe.RevlistOption{}
diff --git a/internal/gitaly/service/commit/list_commits_by_oid.go b/internal/gitaly/service/commit/list_commits_by_oid.go
index 7ef89f58a..4c736b2d8 100644
--- a/internal/gitaly/service/commit/list_commits_by_oid.go
+++ b/internal/gitaly/service/commit/list_commits_by_oid.go
@@ -25,10 +25,11 @@ func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream g
ctx := stream.Context()
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
sender := chunk.New(&commitsByOidSender{stream: stream})
listCommitsbyOidHistogram.Observe(float64(len(in.Oid)))
diff --git a/internal/gitaly/service/commit/list_commits_by_ref_name.go b/internal/gitaly/service/commit/list_commits_by_ref_name.go
index f6d84e1cd..5b0784f77 100644
--- a/internal/gitaly/service/commit/list_commits_by_ref_name.go
+++ b/internal/gitaly/service/commit/list_commits_by_ref_name.go
@@ -13,10 +13,11 @@ func (s *server) ListCommitsByRefName(in *gitalypb.ListCommitsByRefNameRequest,
ctx := stream.Context()
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(err)
}
+ defer cancel()
sender := chunk.New(&commitsByRefNameSender{stream: stream})
diff --git a/internal/gitaly/service/commit/list_last_commits_for_tree.go b/internal/gitaly/service/commit/list_last_commits_for_tree.go
index 99c3ea0a9..9d0a7a64e 100644
--- a/internal/gitaly/service/commit/list_last_commits_for_tree.go
+++ b/internal/gitaly/service/commit/list_last_commits_for_tree.go
@@ -35,10 +35,11 @@ func (s *server) listLastCommitsForTree(in *gitalypb.ListLastCommitsForTreeReque
ctx := stream.Context()
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
batch := make([]*gitalypb.ListLastCommitsForTreeResponse_CommitForTree, 0, maxNumStatBatchSize)
entries, err := getLSTreeEntries(parser)
diff --git a/internal/gitaly/service/commit/tree_entries.go b/internal/gitaly/service/commit/tree_entries.go
index 96bf79693..ab252eecf 100644
--- a/internal/gitaly/service/commit/tree_entries.go
+++ b/internal/gitaly/service/commit/tree_entries.go
@@ -147,16 +147,19 @@ func (s *server) sendTreeEntries(
}
} else {
var err error
+ var cancel func()
- objectReader, err = s.catfileCache.ObjectReader(stream.Context(), repo)
+ objectReader, cancel, err = s.catfileCache.ObjectReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
- objectInfoReader, err = s.catfileCache.ObjectInfoReader(stream.Context(), repo)
+ objectInfoReader, cancel, err = s.catfileCache.ObjectInfoReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
entries, err = catfile.TreeEntries(ctx, objectReader, objectInfoReader, revision, path)
if err != nil {
diff --git a/internal/gitaly/service/commit/tree_entry.go b/internal/gitaly/service/commit/tree_entry.go
index 2573fe3c0..9203931c4 100644
--- a/internal/gitaly/service/commit/tree_entry.go
+++ b/internal/gitaly/service/commit/tree_entry.go
@@ -132,15 +132,17 @@ func (s *server) TreeEntry(in *gitalypb.TreeEntryRequest, stream gitalypb.Commit
requestPath = strings.TrimRight(requestPath, "/")
}
- objectReader, err := s.catfileCache.ObjectReader(stream.Context(), repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
if err != nil {
return err
}
+ defer cancel()
return sendTreeEntry(stream, objectReader, objectInfoReader, string(in.GetRevision()), requestPath, in.GetLimit(), in.GetMaxSize())
}
diff --git a/internal/gitaly/service/operations/tags.go b/internal/gitaly/service/operations/tags.go
index 68acb036a..678e1d68c 100644
--- a/internal/gitaly/service/operations/tags.go
+++ b/internal/gitaly/service/operations/tags.go
@@ -178,15 +178,17 @@ func (s *Server) createTag(
committer *gitalypb.User,
committerTime time.Time,
) (*gitalypb.Tag, git.ObjectID, error) {
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, "", status.Error(codes.Internal, err.Error())
}
+ defer cancel()
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return nil, "", status.Error(codes.Internal, err.Error())
}
+ defer cancel()
// We allow all ways to name a revision that cat-file
// supports, not just OID. Resolve it.
diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go
index 4bfb2d0e9..1bf51fde9 100644
--- a/internal/gitaly/service/ref/find_all_tags.go
+++ b/internal/gitaly/service/ref/find_all_tags.go
@@ -40,10 +40,11 @@ func (s *server) FindAllTags(in *gitalypb.FindAllTagsRequest, stream gitalypb.Re
}
func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortField string, stream gitalypb.RefService_FindAllTagsServer, opts *paginationOpts) error {
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return fmt.Errorf("error creating object reader: %v", err)
}
+ defer cancel()
forEachRefIter := gitpipe.ForEachRef(
ctx,
diff --git a/internal/gitaly/service/ref/find_all_tags_test.go b/internal/gitaly/service/ref/find_all_tags_test.go
index 487790b7d..b562d74fe 100644
--- a/internal/gitaly/service/ref/find_all_tags_test.go
+++ b/internal/gitaly/service/ref/find_all_tags_test.go
@@ -403,11 +403,13 @@ func TestFindAllTags_nestedTags(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
info, err := objectInfoReader.Info(ctx, git.Revision(tc.originalOid))
require.NoError(t, err)
diff --git a/internal/gitaly/service/ref/refs.go b/internal/gitaly/service/ref/refs.go
index 63df4f665..76cebb1b8 100644
--- a/internal/gitaly/service/ref/refs.go
+++ b/internal/gitaly/service/ref/refs.go
@@ -109,10 +109,11 @@ func (s *server) findLocalBranches(in *gitalypb.FindLocalBranchesRequest, stream
ctx := stream.Context()
repo := s.localrepo(in.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
writer := newFindLocalBranchesWriter(stream, objectReader)
opts := buildFindRefsOpts(ctx, in.GetPaginationParams())
@@ -161,10 +162,11 @@ func (s *server) findAllBranches(in *gitalypb.FindAllBranchesRequest, stream git
}
ctx := stream.Context()
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
opts := buildFindRefsOpts(ctx, nil)
opts.cmdArgs = args
@@ -240,10 +242,11 @@ func (s *server) findTag(ctx context.Context, repo git.RepositoryExecutor, tagNa
return nil, fmt.Errorf("for-each-ref error: %v", err)
}
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, err
}
+ defer cancel()
var tag *gitalypb.Tag
diff --git a/internal/gitaly/service/ref/refs_test.go b/internal/gitaly/service/ref/refs_test.go
index 7278f7767..bac8b3e81 100644
--- a/internal/gitaly/service/ref/refs_test.go
+++ b/internal/gitaly/service/ref/refs_test.go
@@ -1129,11 +1129,13 @@ func TestFindTagNestedTag(t *testing.T) {
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
- objectReader, err := catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
- objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
+ defer cancel()
info, err := objectInfoReader.Info(ctx, git.Revision(tc.originalOid))
require.NoError(t, err)
diff --git a/internal/gitaly/service/ref/remote_branches.go b/internal/gitaly/service/ref/remote_branches.go
index 57ed60999..343950e87 100644
--- a/internal/gitaly/service/ref/remote_branches.go
+++ b/internal/gitaly/service/ref/remote_branches.go
@@ -31,10 +31,11 @@ func (s *server) findAllRemoteBranches(req *gitalypb.FindAllRemoteBranchesReques
patterns := []string{"refs/remotes/" + req.GetRemoteName()}
ctx := stream.Context()
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
opts := buildFindRefsOpts(ctx, nil)
opts.cmdArgs = args
diff --git a/internal/gitaly/service/ref/tag_messages.go b/internal/gitaly/service/ref/tag_messages.go
index 6f885eded..895a5098b 100644
--- a/internal/gitaly/service/ref/tag_messages.go
+++ b/internal/gitaly/service/ref/tag_messages.go
@@ -37,10 +37,11 @@ func (s *server) getAndStreamTagMessages(request *gitalypb.GetTagMessagesRequest
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
for _, tagID := range request.GetTagIds() {
tag, err := catfile.GetTag(ctx, objectReader, git.Revision(tagID), "")
diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go
index 8bdf78479..7b05640d2 100644
--- a/internal/gitaly/service/ref/tag_signatures.go
+++ b/internal/gitaly/service/ref/tag_signatures.go
@@ -39,10 +39,11 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream
ctx := stream.Context()
repo := s.localrepo(req.GetRepository())
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternalf("creating object reader: %w", err)
}
+ defer cancel()
chunker := chunk.New(&tagSignatureSender{
send: func(signatures []*gitalypb.GetTagSignaturesResponse_TagSignature) error {
diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go
index 5167176e7..f2ec2e422 100644
--- a/internal/gitaly/service/repository/apply_gitattributes.go
+++ b/internal/gitaly/service/repository/apply_gitattributes.go
@@ -132,10 +132,11 @@ func (s *server) ApplyGitattributes(ctx context.Context, in *gitalypb.ApplyGitat
return nil, helper.ErrInvalidArgumentf("revision: %v", err)
}
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return nil, err
}
+ defer cancel()
if err := s.applyGitattributes(ctx, repo, objectReader, repoPath, in.GetRevision()); err != nil {
return nil, err
diff --git a/internal/gitaly/service/repository/archive.go b/internal/gitaly/service/repository/archive.go
index 878572f91..6c37e1b2c 100644
--- a/internal/gitaly/service/repository/archive.go
+++ b/internal/gitaly/service/repository/archive.go
@@ -147,15 +147,17 @@ func (s *server) validateGetArchivePrecondition(
path string,
exclude []string,
) error {
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return err
}
+ defer cancel()
f := catfile.NewTreeEntryFinder(objectReader, objectInfoReader)
if path != "." {
diff --git a/internal/gitaly/service/repository/gc.go b/internal/gitaly/service/repository/gc.go
index 6e9d3533f..09117f516 100644
--- a/internal/gitaly/service/repository/gc.go
+++ b/internal/gitaly/service/repository/gc.go
@@ -96,10 +96,11 @@ func (s *server) cleanupKeepArounds(ctx context.Context, repo *localrepo.Repo) e
return nil
}
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(ctx, repo)
if err != nil {
return nil
}
+ defer cancel()
keepAroundsPrefix := "refs/keep-around"
keepAroundsPath := filepath.Join(repoPath, keepAroundsPrefix)
diff --git a/internal/gitaly/service/repository/raw_changes.go b/internal/gitaly/service/repository/raw_changes.go
index fb1c2c5a1..5c3184b88 100644
--- a/internal/gitaly/service/repository/raw_changes.go
+++ b/internal/gitaly/service/repository/raw_changes.go
@@ -20,10 +20,11 @@ func (s *server) GetRawChanges(req *gitalypb.GetRawChangesRequest, stream gitaly
ctx := stream.Context()
repo := s.localrepo(req.GetRepository())
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
+ objectInfoReader, cancel, err := s.catfileCache.ObjectInfoReader(stream.Context(), repo)
if err != nil {
return helper.ErrInternal(err)
}
+ defer cancel()
if err := validateRawChangesRequest(ctx, req, objectInfoReader); err != nil {
return helper.ErrInvalidArgument(err)