Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStan Hu <stanhu@gmail.com>2021-09-21 19:13:46 +0300
committerStan Hu <stanhu@gmail.com>2021-09-21 19:14:09 +0300
commitc5a32bd5a681b448c5aae248a5adfe332037eaca (patch)
treef8d7d96fc71fb5faddda7d701b8b4e42689f68ca /internal
parentf8946a7c774dc0a2cffec5ec3456024fca6d808c (diff)
Revert catfile cache refactor
This reverts https://gitlab.com/gitlab-org/gitaly/-/merge_requests/3853 since we believe it's leading to a Goroutine leak in https://gitlab.com/gitlab-com/gl-infra/production/-/issues/5566.
Diffstat (limited to 'internal')
-rw-r--r--internal/command/command.go10
-rw-r--r--internal/git/catfile/batch.go220
-rw-r--r--internal/git/catfile/batch_cache.go119
-rw-r--r--internal/git/catfile/batch_cache_test.go278
-rw-r--r--internal/git/catfile/batch_check_process.go67
-rw-r--r--internal/git/catfile/batch_process.go148
-rw-r--r--internal/git/catfile/batch_test.go39
-rw-r--r--internal/git/catfile/object.go13
-rw-r--r--internal/git/catfile/object_info_reader.go143
-rw-r--r--internal/git/catfile/object_info_reader_test.go164
-rw-r--r--internal/git/catfile/object_reader.go174
-rw-r--r--internal/git/catfile/object_reader_test.go137
-rw-r--r--internal/git/catfile/objectinfo.go65
-rw-r--r--internal/git/catfile/objectinfo_fuzz.go (renamed from internal/git/catfile/object_info_reader_fuzz.go)0
-rw-r--r--internal/git/catfile/objectinfo_test.go68
-rw-r--r--internal/git/catfile/testhelper_test.go29
-rw-r--r--internal/git/catfile/tracing.go27
17 files changed, 623 insertions, 1078 deletions
diff --git a/internal/command/command.go b/internal/command/command.go
index d1db84cd1..eb2d494be 100644
--- a/internal/command/command.go
+++ b/internal/command/command.go
@@ -234,6 +234,7 @@ func New(ctx context.Context, cmd *exec.Cmd, stdin io.Reader, stdout, stderr io.
syscall.Kill(-process.Pid, syscall.SIGTERM)
}
command.Wait()
+ wg.Done()
}()
logPid = cmd.Process.Pid
@@ -276,15 +277,6 @@ func (c *Command) wait() {
inFlightCommandGauge.Dec()
c.logProcessComplete()
-
- // This is a bit out-of-place here given that the `wg.Add()` call is in `New()`.
- // But in `New()`, we have to resort waiting on the context being finished until we
- // would be able to decrement the number of in-flight commands. Given that in some
- // cases we detach processes from their initial context such that they run in the
- // background, this would cause us to take longer than necessary to decrease the
- // wait group counter again. So we instead do it here to accelerate the process,
- // even though it's less idiomatic.
- wg.Done()
}
// ExitStatus will return the exit-code from an error returned by Wait().
diff --git a/internal/git/catfile/batch.go b/internal/git/catfile/batch.go
index 5c11ad015..8bf739775 100644
--- a/internal/git/catfile/batch.go
+++ b/internal/git/catfile/batch.go
@@ -7,6 +7,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/labkit/correlation"
)
const (
@@ -28,69 +29,17 @@ type Batch interface {
}
type batch struct {
- *objectInfoReader
- *objectReader
-
- closedMutex sync.Mutex
- closed bool
-}
-
-func newBatch(
- ctx context.Context,
- repo git.RepositoryExecutor,
- counter *prometheus.CounterVec,
-) (_ *batch, returnedErr error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch")
- go func() {
- <-ctx.Done()
- span.Finish()
- }()
-
- objectReader, err := newObjectReader(ctx, repo, counter)
- if err != nil {
- return nil, err
- }
- defer func() {
- // If creation of the ObjectInfoReader fails, then we do not want to leak the
- // ObjectReader process.
- if returnedErr != nil {
- objectReader.Close()
- }
- }()
-
- objectInfoReader, err := newObjectInfoReader(ctx, repo, counter)
- if err != nil {
- return nil, err
- }
-
- return &batch{objectReader: objectReader, objectInfoReader: objectInfoReader}, nil
-}
-
-// Close closes the writers for objectInfoReader and objectReader. This is only used for cached
-// Batches
-func (c *batch) Close() {
- c.closedMutex.Lock()
- defer c.closedMutex.Unlock()
-
- if c.closed {
- return
- }
-
- c.closed = true
- c.objectReader.Close()
- c.objectInfoReader.Close()
-}
-
-func (c *batch) isClosed() bool {
- c.closedMutex.Lock()
- defer c.closedMutex.Unlock()
- return c.closed
+ sync.Mutex
+ *batchCheckProcess
+ *batchProcess
+ cancel func()
+ closed bool
}
// Info returns an ObjectInfo if spec exists. If the revision does not exist
// the error is of type NotFoundError.
func (c *batch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
- return c.objectInfoReader.info(ctx, revision)
+ return c.batchCheckProcess.info(revision)
}
// Tree returns a raw tree object. It is an error if the revision does not
@@ -98,7 +47,7 @@ func (c *batch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, e
// the object type. Caller must consume the Reader before making another call
// on C.
func (c *batch) Tree(ctx context.Context, revision git.Revision) (*Object, error) {
- return c.objectReader.reader(ctx, revision, "tree")
+ return c.batchProcess.reader(revision, "tree")
}
// Commit returns a raw commit object. It is an error if the revision does not
@@ -106,7 +55,7 @@ func (c *batch) Tree(ctx context.Context, revision git.Revision) (*Object, error
// check the object type. Caller must consume the Reader before making another
// call on C.
func (c *batch) Commit(ctx context.Context, revision git.Revision) (*Object, error) {
- return c.objectReader.reader(ctx, revision, "commit")
+ return c.batchProcess.reader(revision, "commit")
}
// Blob returns a reader for the requested blob. The entire blob must be
@@ -115,11 +64,158 @@ func (c *batch) Commit(ctx context.Context, revision git.Revision) (*Object, err
// It is an error if the revision does not point to a blob. To prevent this,
// use Info to resolve the revision and check the object type.
func (c *batch) Blob(ctx context.Context, revision git.Revision) (*Object, error) {
- return c.objectReader.reader(ctx, revision, "blob")
+ return c.batchProcess.reader(revision, "blob")
}
// Tag returns a raw tag object. Caller must consume the Reader before
// making another call on C.
func (c *batch) Tag(ctx context.Context, revision git.Revision) (*Object, error) {
- return c.objectReader.reader(ctx, revision, "tag")
+ return c.batchProcess.reader(revision, "tag")
+}
+
+// Close closes the writers for batchCheckProcess and batchProcess. This is only used for cached
+// Batches
+func (c *batch) Close() {
+ c.Lock()
+ defer c.Unlock()
+
+ if c.closed {
+ return
+ }
+
+ c.closed = true
+ if c.cancel != nil {
+ // both c.batchProcess and c.batchCheckProcess have goroutines that listen on
+ // ctx.Done() when this is cancelled, it will cause those goroutines to close both
+ // writers
+ c.cancel()
+ }
+}
+
+func (c *batch) isClosed() bool {
+ c.Lock()
+ defer c.Unlock()
+ return c.closed
+}
+
+type simulatedBatchSpawnError struct{}
+
+func (simulatedBatchSpawnError) Error() string { return "simulated spawn error" }
+
+func (bc *BatchCache) newBatch(ctx context.Context, repo git.RepositoryExecutor) (*batch, context.Context, error) {
+ var err error
+
+ // batch processes are long-lived and reused across RPCs,
+ // so we de-correlate the process from the RPC
+ ctx = correlation.ContextWithCorrelation(ctx, "")
+ ctx = opentracing.ContextWithSpan(ctx, nil)
+ span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch")
+
+ ctx, cancel := context.WithCancel(ctx)
+ defer func() {
+ if err != nil {
+ cancel()
+ }
+ }()
+
+ go func() {
+ <-ctx.Done()
+ span.Finish()
+ }()
+
+ batchProcess, err := bc.newBatchProcess(ctx, repo)
+ if err != nil {
+ return nil, ctx, err
+ }
+
+ batchCheckProcess, err := bc.newBatchCheckProcess(ctx, repo)
+ if err != nil {
+ return nil, ctx, err
+ }
+
+ return &batch{batchProcess: batchProcess, batchCheckProcess: batchCheckProcess}, ctx, nil
+}
+
+func newInstrumentedBatch(ctx context.Context, c Batch, catfileLookupCounter *prometheus.CounterVec) Batch {
+ return &instrumentedBatch{
+ Batch: c,
+ catfileLookupCounter: catfileLookupCounter,
+ batchCtx: ctx,
+ }
+}
+
+// We maintain two contexts here: the one RPC-level one, and one batch-level one.
+//
+// The batchCtx tracks the lifetime of the long-running batch process, and is
+// de-correlated from the RPC, as it is shared between many RPCs.
+//
+// We perform double accounting and re-correlation to get stats and traces per
+// batch process.
+type instrumentedBatch struct {
+ Batch
+ catfileLookupCounter *prometheus.CounterVec
+ batchCtx context.Context
+}
+
+func (ib *instrumentedBatch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
+ ctx, finish := ib.startSpan(ctx, "Batch.Info", revision)
+ defer finish()
+
+ ib.catfileLookupCounter.WithLabelValues("info").Inc()
+
+ return ib.Batch.Info(ctx, revision)
+}
+
+func (ib *instrumentedBatch) Tree(ctx context.Context, revision git.Revision) (*Object, error) {
+ ctx, finish := ib.startSpan(ctx, "Batch.Tree", revision)
+ defer finish()
+
+ ib.catfileLookupCounter.WithLabelValues("tree").Inc()
+
+ return ib.Batch.Tree(ctx, revision)
+}
+
+func (ib *instrumentedBatch) Commit(ctx context.Context, revision git.Revision) (*Object, error) {
+ ctx, finish := ib.startSpan(ctx, "Batch.Commit", revision)
+ defer finish()
+
+ ib.catfileLookupCounter.WithLabelValues("commit").Inc()
+
+ return ib.Batch.Commit(ctx, revision)
+}
+
+func (ib *instrumentedBatch) Blob(ctx context.Context, revision git.Revision) (*Object, error) {
+ ctx, finish := ib.startSpan(ctx, "Batch.Blob", revision)
+ defer finish()
+
+ ib.catfileLookupCounter.WithLabelValues("blob").Inc()
+
+ return ib.Batch.Blob(ctx, revision)
+}
+
+func (ib *instrumentedBatch) Tag(ctx context.Context, revision git.Revision) (*Object, error) {
+ ctx, finish := ib.startSpan(ctx, "Batch.Tag", revision)
+ defer finish()
+
+ ib.catfileLookupCounter.WithLabelValues("tag").Inc()
+
+ return ib.Batch.Tag(ctx, revision)
+}
+
+func (ib *instrumentedBatch) revisionTag(revision git.Revision) opentracing.Tag {
+ return opentracing.Tag{Key: "revision", Value: revision}
+}
+
+func (ib *instrumentedBatch) correlationIDTag(ctx context.Context) opentracing.Tag {
+ return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)}
+}
+
+func (ib *instrumentedBatch) startSpan(ctx context.Context, methodName string, revision git.Revision) (context.Context, func()) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, methodName, ib.revisionTag(revision))
+ span2, _ := opentracing.StartSpanFromContext(ib.batchCtx, methodName, ib.revisionTag(revision), ib.correlationIDTag(ctx))
+
+ return ctx, func() {
+ span.Finish()
+ span2.Finish()
+ }
}
diff --git a/internal/git/catfile/batch_cache.go b/internal/git/catfile/batch_cache.go
index b8e1f966a..336d9db3d 100644
--- a/internal/git/catfile/batch_cache.go
+++ b/internal/git/catfile/batch_cache.go
@@ -6,13 +6,11 @@ import (
"sync"
"time"
- "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
- "gitlab.com/gitlab-org/labkit/correlation"
)
const (
@@ -34,18 +32,14 @@ type Cache interface {
Evict()
}
-func newCacheKey(sessionID string, repo repository.GitRepo) (key, bool) {
- if sessionID == "" {
- return key{}, false
- }
-
+func newCacheKey(sessionID string, repo repository.GitRepo) key {
return key{
sessionID: sessionID,
repoStorage: repo.GetStorageName(),
repoRelPath: repo.GetRelativePath(),
repoObjDir: repo.GetGitObjectDirectory(),
repoAltDir: strings.Join(repo.GetGitAlternateObjectDirectories(), ","),
- }, true
+ }
}
type key struct {
@@ -71,6 +65,9 @@ type BatchCache struct {
maxLen int
// ttl is the fixed ttl for cache entries
ttl time.Duration
+ // injectSpawnErrors is used for testing purposes only. If set to true, then spawned batch
+ // processes will simulate spawn errors.
+ injectSpawnErrors bool
// monitorTicker is the ticker used for the monitoring Goroutine.
monitorTicker *time.Ticker
monitorDone chan interface{}
@@ -83,11 +80,6 @@ type BatchCache struct {
entriesMutex sync.Mutex
entries []*entry
-
- // cachedProcessDone is a condition that gets signalled whenever a process is being
- // considered to be returned to the cache. This field is optional and must only be used in
- // tests.
- cachedProcessDone *sync.Cond
}
// NewCache creates a new catfile process cache.
@@ -95,6 +87,15 @@ func NewCache(cfg config.Cfg) *BatchCache {
return newCache(defaultBatchfileTTL, cfg.Git.CatfileCacheSize, defaultEvictionInterval)
}
+// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called
+// once.
+func (bc *BatchCache) Stop() {
+ bc.monitorTicker.Stop()
+ bc.monitorDone <- struct{}{}
+ <-bc.monitorDone
+ bc.Evict()
+}
+
func newCache(ttl time.Duration, maxLen int, refreshInterval time.Duration) *BatchCache {
if maxLen <= 0 {
maxLen = defaultMaxLen
@@ -169,91 +170,47 @@ func (bc *BatchCache) monitor() {
}
}
-// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called
-// once.
-func (bc *BatchCache) Stop() {
- bc.monitorTicker.Stop()
- bc.monitorDone <- struct{}{}
- <-bc.monitorDone
- bc.Evict()
-}
-
// BatchProcess creates a new Batch process for the given repository.
-func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecutor) (_ Batch, returnedErr error) {
- requestDone := ctx.Done()
- if requestDone == nil {
+func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecutor) (Batch, error) {
+ if ctx.Done() == nil {
panic("empty ctx.Done() in catfile.Batch.New()")
}
- cacheKey, isCacheable := newCacheKey(metadata.GetValue(ctx, SessionIDField), repo)
- if isCacheable {
- // We only try to look up cached batch processes in case it is cacheable, which
- // requires a session ID. This is mostly done such that git-cat-file(1) processes
- // from one user cannot interfer with those from another user. The main intent is to
- // disallow trivial denial of service attacks against other users in case it is
- // possible to poison the cache with broken git-cat-file(1) processes.
-
- if c, ok := bc.checkout(cacheKey); ok {
- go bc.returnWhenDone(requestDone, cacheKey, c)
- return c, nil
+ sessionID := metadata.GetValue(ctx, SessionIDField)
+ if sessionID == "" {
+ c, ctx, err := bc.newBatch(ctx, repo)
+ if err != nil {
+ return nil, err
}
+ return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), err
+ }
- // We have not found any cached process, so we need to create a new one. In this
- // case, we need to detach the process from the current context such that it does
- // not get killed when the current context is done. Note that while we explicitly
- // `Close()` processes in case this function fails, we must have a cancellable
- // context or otherwise our `command` package will panic.
- var cancel func()
- ctx, cancel = context.WithCancel(context.Background())
- defer func() {
- if returnedErr != nil {
- cancel()
- }
- }()
-
- // We have to decorrelate the process from the current context given that it
- // may potentially be reused across different RPC calls.
- ctx = correlation.ContextWithCorrelation(ctx, "")
- ctx = opentracing.ContextWithSpan(ctx, nil)
+ cacheKey := newCacheKey(sessionID, repo)
+ requestDone := ctx.Done()
+
+ if c, ok := bc.checkout(cacheKey); ok {
+ go bc.returnWhenDone(requestDone, cacheKey, c)
+ return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil
}
- c, err := newBatch(ctx, repo, bc.catfileLookupCounter)
+ // if we are using caching, create a fresh context for the new batch
+ // and initialize the new batch with a bc key and cancel function
+ cacheCtx, cacheCancel := context.WithCancel(context.Background())
+ c, ctx, err := bc.newBatch(cacheCtx, repo)
if err != nil {
+ cacheCancel()
return nil, err
}
- defer func() {
- // If we somehow fail after creating a new Batch process, then we want to kill
- // spawned processes right away.
- if returnedErr != nil {
- c.Close()
- }
- }()
-
- bc.totalCatfileProcesses.Inc()
- bc.currentCatfileProcesses.Inc()
- go func() {
- <-ctx.Done()
- bc.currentCatfileProcesses.Dec()
- }()
-
- if isCacheable {
- // If the process is cacheable, then we want to put the process into the cache when
- // the current outer context is done.
- go bc.returnWhenDone(requestDone, cacheKey, c)
- }
- return c, nil
+ c.cancel = cacheCancel
+ go bc.returnWhenDone(requestDone, cacheKey, c)
+
+ return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil
}
func (bc *BatchCache) returnWhenDone(done <-chan struct{}, cacheKey key, c *batch) {
<-done
- if bc.cachedProcessDone != nil {
- defer func() {
- bc.cachedProcessDone.Broadcast()
- }()
- }
-
if c == nil || c.isClosed() {
return
}
diff --git a/internal/git/catfile/batch_cache_test.go b/internal/git/catfile/batch_cache_test.go
index 9816c0d7d..885978a7c 100644
--- a/internal/git/catfile/batch_cache_test.go
+++ b/internal/git/catfile/batch_cache_test.go
@@ -1,87 +1,70 @@
package catfile
import (
- "context"
- "errors"
- "io"
- "os"
- "sync"
+ "fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
- "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
- "gitlab.com/gitlab-org/labkit/correlation"
- "google.golang.org/grpc/metadata"
)
-func TestCache_add(t *testing.T) {
+func TestCacheAdd(t *testing.T) {
const maxLen = 3
bc := newCache(time.Hour, maxLen, defaultEvictionInterval)
- cfg, repo, _ := testcfg.BuildWithRepo(t)
-
- key0 := mustCreateKey(t, "0", repo)
- value0 := mustCreateBatch(t, cfg, repo)
+ key0 := testKey(0)
+ value0 := testValue()
bc.add(key0, value0)
requireCacheValid(t, bc)
- key1 := mustCreateKey(t, "1", repo)
- bc.add(key1, mustCreateBatch(t, cfg, repo))
+ key1 := testKey(1)
+ bc.add(key1, testValue())
requireCacheValid(t, bc)
- key2 := mustCreateKey(t, "2", repo)
- bc.add(key2, mustCreateBatch(t, cfg, repo))
+ key2 := testKey(2)
+ bc.add(key2, testValue())
requireCacheValid(t, bc)
// Because maxLen is 3, and key0 is oldest, we expect that adding key3
// will kick out key0.
- key3 := mustCreateKey(t, "3", repo)
- bc.add(key3, mustCreateBatch(t, cfg, repo))
+ key3 := testKey(3)
+ bc.add(key3, testValue())
requireCacheValid(t, bc)
require.Equal(t, maxLen, bc.len(), "length should be maxLen")
require.True(t, value0.isClosed(), "value0 should be closed")
- require.Equal(t, []key{key1, key2, key3}, keys(t, bc))
+ require.Equal(t, []key{key1, key2, key3}, keys(bc))
}
-func TestCache_addTwice(t *testing.T) {
+func TestCacheAddTwice(t *testing.T) {
bc := newCache(time.Hour, 10, defaultEvictionInterval)
- cfg, repo, _ := testcfg.BuildWithRepo(t)
-
- key0 := mustCreateKey(t, "0", repo)
- value0 := mustCreateBatch(t, cfg, repo)
+ key0 := testKey(0)
+ value0 := testValue()
bc.add(key0, value0)
requireCacheValid(t, bc)
- key1 := mustCreateKey(t, "1", repo)
- value1 := mustCreateBatch(t, cfg, repo)
- bc.add(key1, value1)
+ key1 := testKey(1)
+ bc.add(key1, testValue())
requireCacheValid(t, bc)
require.Equal(t, key0, bc.head().key, "key0 should be oldest key")
- value2 := mustCreateBatch(t, cfg, repo)
+ value2 := testValue()
bc.add(key0, value2)
requireCacheValid(t, bc)
require.Equal(t, key1, bc.head().key, "key1 should be oldest key")
- require.Equal(t, value1, bc.head().value)
+ require.Equal(t, value2, bc.head().value)
require.True(t, value0.isClosed(), "value0 should be closed")
}
-func TestCache_checkout(t *testing.T) {
+func TestCacheCheckout(t *testing.T) {
bc := newCache(time.Hour, 10, defaultEvictionInterval)
- cfg, repo, _ := testcfg.BuildWithRepo(t)
-
- key0 := mustCreateKey(t, "0", repo)
- value0 := mustCreateBatch(t, cfg, repo)
+ key0 := testKey(0)
+ value0 := testValue()
bc.add(key0, value0)
v, ok := bc.checkout(key{sessionID: "foo"})
@@ -102,33 +85,31 @@ func TestCache_checkout(t *testing.T) {
require.Nil(t, v, "value from second checkout")
}
-func TestCache_enforceTTL(t *testing.T) {
+func TestCacheEnforceTTL(t *testing.T) {
ttl := time.Hour
bc := newCache(ttl, 10, defaultEvictionInterval)
- cfg, repo, _ := testcfg.BuildWithRepo(t)
-
sleep := func() { time.Sleep(2 * time.Millisecond) }
- key0 := mustCreateKey(t, "0", repo)
- value0 := mustCreateBatch(t, cfg, repo)
+ key0 := testKey(0)
+ value0 := testValue()
bc.add(key0, value0)
sleep()
- key1 := mustCreateKey(t, "1", repo)
- value1 := mustCreateBatch(t, cfg, repo)
+ key1 := testKey(1)
+ value1 := testValue()
bc.add(key1, value1)
sleep()
cutoff := time.Now().Add(ttl)
sleep()
- key2 := mustCreateKey(t, "2", repo)
- bc.add(key2, mustCreateBatch(t, cfg, repo))
+ key2 := testKey(2)
+ bc.add(key2, testValue())
sleep()
- key3 := mustCreateKey(t, "3", repo)
- bc.add(key3, mustCreateBatch(t, cfg, repo))
+ key3 := testKey(3)
+ bc.add(key3, testValue())
sleep()
requireCacheValid(t, bc)
@@ -142,198 +123,40 @@ func TestCache_enforceTTL(t *testing.T) {
require.True(t, v.isClosed(), "value %d %v should be closed", i, v)
}
- require.Equal(t, []key{key2, key3}, keys(t, bc), "remaining keys after EnforceTTL")
+ require.Equal(t, []key{key2, key3}, keys(bc), "remaining keys after EnforceTTL")
bc.enforceTTL(cutoff)
requireCacheValid(t, bc)
- require.Equal(t, []key{key2, key3}, keys(t, bc), "remaining keys after second EnforceTTL")
+ require.Equal(t, []key{key2, key3}, keys(bc), "remaining keys after second EnforceTTL")
}
-func TestCache_autoExpiry(t *testing.T) {
+func TestAutoExpiry(t *testing.T) {
ttl := 5 * time.Millisecond
refresh := 1 * time.Millisecond
bc := newCache(ttl, 10, refresh)
- cfg, repo, _ := testcfg.BuildWithRepo(t)
-
- key0 := mustCreateKey(t, "0", repo)
- value0 := mustCreateBatch(t, cfg, repo)
+ key0 := testKey(0)
+ value0 := testValue()
bc.add(key0, value0)
requireCacheValid(t, bc)
- require.Contains(t, keys(t, bc), key0, "key should still be in map")
+ require.Contains(t, keys(bc), key0, "key should still be in map")
require.False(t, value0.isClosed(), "value should not have been closed")
// Wait for the monitor goroutine to do its thing
for i := 0; i < 100; i++ {
- if len(keys(t, bc)) == 0 {
+ if len(keys(bc)) == 0 {
break
}
time.Sleep(refresh)
}
- require.Empty(t, keys(t, bc), "key should no longer be in map")
+ require.Empty(t, keys(bc), "key should no longer be in map")
require.True(t, value0.isClosed(), "value should be closed after eviction")
}
-func TestCache_BatchProcess(t *testing.T) {
- cfg, repo, _ := testcfg.BuildWithRepo(t)
- repoExecutor := newRepoExecutor(t, cfg, repo)
-
- cache := newCache(time.Hour, 10, time.Hour)
- defer cache.Evict()
- cache.cachedProcessDone = sync.NewCond(&sync.Mutex{})
-
- t.Run("uncancellable", func(t *testing.T) {
- ctx := context.Background()
-
- require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() {
- _, _ = cache.BatchProcess(ctx, repoExecutor)
- })
- })
-
- t.Run("uncacheable", func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
- ctx = correlation.ContextWithCorrelation(ctx, "1")
-
- // The context doesn't carry a session ID and is thus uncacheable.
- // The process should never get returned to the cache and must be
- // killed on context cancellation.
- batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
- require.NoError(t, err)
-
- batch, ok := batchProcess.(*batch)
- require.True(t, ok, "expected batch")
-
- correlation := correlation.ExtractFromContext(batch.objectReader.creationCtx)
- require.Equal(t, "1", correlation)
-
- cancel()
-
- // We're cheating a bit here to avoid creating a racy test by reaching into the
- // batch processes and trying to read from their stdout. If the cancel did kill the
- // process as expected, then the stdout should be closed and we'll get an EOF.
- for _, reader := range []io.Reader{batch.objectInfoReader.cmd, batch.objectReader.cmd} {
- output, err := io.ReadAll(reader)
- if err != nil {
- require.True(t, errors.Is(err, os.ErrClosed))
- } else {
- require.NoError(t, err)
- }
- require.Empty(t, output)
- }
-
- // This is another bug: while we do not have any resource leaks because processes
- // got killed as expected, the batch itself is not considered to have been closed.
- require.False(t, batch.isClosed())
-
- require.Empty(t, keys(t, cache))
- })
-
- t.Run("cacheable", func(t *testing.T) {
- defer cache.Evict()
-
- ctx, cancel := testhelper.Context()
- defer cancel()
- ctx = correlation.ContextWithCorrelation(ctx, "1")
- ctx = testhelper.MergeIncomingMetadata(ctx,
- metadata.Pairs(SessionIDField, "1"),
- )
-
- batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
- require.NoError(t, err)
-
- batch, ok := batchProcess.(*batch)
- require.True(t, ok, "expected instrumented batch")
-
- // The correlation ID must be empty given that this will be a cached long-running
- // processes that can be reused across multpile RPC calls.
- correlation := correlation.ExtractFromContext(batch.objectReader.creationCtx)
- require.Empty(t, correlation)
-
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
- cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
-
- keys := keys(t, cache)
- require.Equal(t, []key{{
- sessionID: "1",
- repoStorage: repo.GetStorageName(),
- repoRelPath: repo.GetRelativePath(),
- }}, keys)
-
- // Assert that we can still read from the cached process.
- _, err = batchProcess.Info(ctx, "refs/heads/master")
- require.NoError(t, err)
- })
-
- t.Run("dirty process does not get cached", func(t *testing.T) {
- defer cache.Evict()
-
- ctx, cancel := testhelper.Context()
- defer cancel()
- ctx = testhelper.MergeIncomingMetadata(ctx,
- metadata.Pairs(SessionIDField, "1"),
- )
-
- batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
- require.NoError(t, err)
-
- // While we request object data, we do not consume it at all. The reader is thus
- // dirty and cannot be reused and shouldn't be returned to the cache.
- _, err = batchProcess.Commit(ctx, "refs/heads/master")
- require.NoError(t, err)
-
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
- cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
-
- require.Empty(t, keys(t, cache))
-
- // The process should be killed now.
- _, err = batchProcess.Info(ctx, "refs/heads/master")
- require.True(t, errors.Is(err, os.ErrClosed))
- })
-
- t.Run("closed process does not get cached", func(t *testing.T) {
- defer cache.Evict()
-
- ctx, cancel := testhelper.Context()
- defer cancel()
- ctx = testhelper.MergeIncomingMetadata(ctx,
- metadata.Pairs(SessionIDField, "1"),
- )
-
- batchProcess, err := cache.BatchProcess(ctx, repoExecutor)
- require.NoError(t, err)
-
- batch, ok := batchProcess.(*batch)
- require.True(t, ok, "expected batch")
-
- // Closed processes naturally cannot be reused anymore and thus shouldn't ever get
- // cached.
- batch.Close()
-
- // Cancel the context such that the process will be considered for return to the
- // cache and wait for the cache to collect it.
- cache.cachedProcessDone.L.Lock()
- cancel()
- defer cache.cachedProcessDone.L.Unlock()
- cache.cachedProcessDone.Wait()
-
- require.Empty(t, keys(t, cache))
- })
-}
-
func requireCacheValid(t *testing.T, bc *BatchCache) {
bc.entriesMutex.Lock()
defer bc.entriesMutex.Unlock()
@@ -344,30 +167,11 @@ func requireCacheValid(t *testing.T, bc *BatchCache) {
}
}
-func mustCreateBatch(t *testing.T, cfg config.Cfg, repo repository.GitRepo) *batch {
- t.Helper()
-
- ctx, cancel := testhelper.Context()
- t.Cleanup(cancel)
-
- batch, err := newBatch(ctx, newRepoExecutor(t, cfg, repo), nil)
- require.NoError(t, err)
-
- return batch
-}
-
-func mustCreateKey(t *testing.T, sessionID string, repo repository.GitRepo) key {
- t.Helper()
-
- key, cacheable := newCacheKey(sessionID, repo)
- require.True(t, cacheable)
-
- return key
-}
+func testValue() *batch { return &batch{} }
-func keys(t *testing.T, bc *BatchCache) []key {
- t.Helper()
+func testKey(i int) key { return key{sessionID: fmt.Sprintf("key-%d", i)} }
+func keys(bc *BatchCache) []key {
bc.entriesMutex.Lock()
defer bc.entriesMutex.Unlock()
diff --git a/internal/git/catfile/batch_check_process.go b/internal/git/catfile/batch_check_process.go
new file mode 100644
index 000000000..2ff5256e5
--- /dev/null
+++ b/internal/git/catfile/batch_check_process.go
@@ -0,0 +1,67 @@
+package catfile
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "io"
+ "sync"
+
+ "github.com/opentracing/opentracing-go"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+)
+
+// batchCheckProcess encapsulates a 'git cat-file --batch-check' process
+type batchCheckProcess struct {
+ r *bufio.Reader
+ w io.WriteCloser
+ sync.Mutex
+}
+
+func (bc *BatchCache) newBatchCheckProcess(ctx context.Context, repo git.RepositoryExecutor) (*batchCheckProcess, error) {
+ process := &batchCheckProcess{}
+
+ var stdinReader io.Reader
+ stdinReader, process.w = io.Pipe()
+
+ span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchCheckProcess")
+
+ batchCmd, err := repo.Exec(ctx,
+ git.SubCmd{
+ Name: "cat-file",
+ Flags: []git.Option{
+ git.Flag{Name: "--batch-check"},
+ },
+ },
+ git.WithStdin(stdinReader),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ process.r = bufio.NewReader(batchCmd)
+ go func() {
+ <-ctx.Done()
+ // This is crucial to prevent leaking file descriptors.
+ process.w.Close()
+ span.Finish()
+ }()
+
+ if bc.injectSpawnErrors {
+ // Testing only: intentionally leak process
+ return nil, &simulatedBatchSpawnError{}
+ }
+
+ return process, nil
+}
+
+func (bc *batchCheckProcess) info(revision git.Revision) (*ObjectInfo, error) {
+ bc.Lock()
+ defer bc.Unlock()
+
+ if _, err := fmt.Fprintln(bc.w, revision.String()); err != nil {
+ return nil, err
+ }
+
+ return ParseObjectInfo(bc.r)
+}
diff --git a/internal/git/catfile/batch_process.go b/internal/git/catfile/batch_process.go
new file mode 100644
index 000000000..30a79c963
--- /dev/null
+++ b/internal/git/catfile/batch_process.go
@@ -0,0 +1,148 @@
+package catfile
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "sync"
+
+ "github.com/opentracing/opentracing-go"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+)
+
+// batch encapsulates a 'git cat-file --batch' process
+type batchProcess struct {
+ r *bufio.Reader
+ w io.WriteCloser
+
+ // n is a state machine that tracks how much data we still have to read
+ // from r. Legal states are: n==0, this means we can do a new request on
+ // the cat-file process. n==1, this means that we have to discard a
+ // trailing newline. n>0, this means we are in the middle of reading a
+ // raw git object.
+ n int64
+
+ // Even though the batch type should not be used concurrently, I think
+ // that if that does happen by mistake we should give proper errors
+ // instead of doing unsafe memory writes (to n) and failing in some
+ // unpredictable way.
+ sync.Mutex
+}
+
+func (bc *BatchCache) newBatchProcess(ctx context.Context, repo git.RepositoryExecutor) (*batchProcess, error) {
+ bc.totalCatfileProcesses.Inc()
+ b := &batchProcess{}
+
+ var stdinReader io.Reader
+ stdinReader, b.w = io.Pipe()
+
+ span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchProcess")
+
+ batchCmd, err := repo.Exec(ctx,
+ git.SubCmd{
+ Name: "cat-file",
+ Flags: []git.Option{
+ git.Flag{Name: "--batch"},
+ },
+ },
+ git.WithStdin(stdinReader),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ b.r = bufio.NewReader(batchCmd)
+
+ bc.currentCatfileProcesses.Inc()
+ go func() {
+ <-ctx.Done()
+ // This Close() is crucial to prevent leaking file descriptors.
+ b.w.Close()
+ bc.currentCatfileProcesses.Dec()
+ span.Finish()
+ }()
+
+ if bc.injectSpawnErrors {
+ // Testing only: intentionally leak process
+ return nil, &simulatedBatchSpawnError{}
+ }
+
+ return b, nil
+}
+
+func (b *batchProcess) reader(revision git.Revision, expectedType string) (*Object, error) {
+ b.Lock()
+ defer b.Unlock()
+
+ if b.n == 1 {
+ // Consume linefeed
+ if _, err := b.r.ReadByte(); err != nil {
+ return nil, err
+ }
+ b.n--
+ }
+
+ if b.n != 0 {
+ return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", b.n)
+ }
+
+ if _, err := fmt.Fprintln(b.w, revision.String()); err != nil {
+ return nil, err
+ }
+
+ oi, err := ParseObjectInfo(b.r)
+ if err != nil {
+ return nil, err
+ }
+
+ b.n = oi.Size + 1
+
+ if oi.Type != expectedType {
+ // This is a programmer error and it should never happen. But if it does,
+ // we need to leave the cat-file process in a good state
+ if _, err := io.CopyN(ioutil.Discard, b.r, b.n); err != nil {
+ return nil, err
+ }
+ b.n = 0
+
+ return nil, NotFoundError{error: fmt.Errorf("expected %s to be a %s, got %s", oi.Oid, expectedType, oi.Type)}
+ }
+
+ return &Object{
+ ObjectInfo: *oi,
+ Reader: &batchReader{
+ batchProcess: b,
+ r: io.LimitReader(b.r, oi.Size),
+ },
+ }, nil
+}
+
+func (b *batchProcess) consume(nBytes int) {
+ b.n -= int64(nBytes)
+ if b.n < 1 {
+ panic("too many bytes read from batch")
+ }
+}
+
+func (b *batchProcess) hasUnreadData() bool {
+ b.Lock()
+ defer b.Unlock()
+
+ return b.n > 1
+}
+
+type batchReader struct {
+ *batchProcess
+ r io.Reader
+}
+
+func (br *batchReader) Read(p []byte) (int, error) {
+ br.batchProcess.Lock()
+ defer br.batchProcess.Unlock()
+
+ n, err := br.r.Read(p)
+ br.batchProcess.consume(n)
+ return n, err
+}
diff --git a/internal/git/catfile/batch_test.go b/internal/git/catfile/batch_test.go
index 95e62cb96..19710e900 100644
--- a/internal/git/catfile/batch_test.go
+++ b/internal/git/catfile/batch_test.go
@@ -3,7 +3,6 @@ package catfile
import (
"bytes"
"context"
- "fmt"
"io"
"os"
"os/exec"
@@ -14,6 +13,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -22,11 +22,30 @@ import (
"google.golang.org/grpc/metadata"
)
+type repoExecutor struct {
+ repository.GitRepo
+ gitCmdFactory git.CommandFactory
+}
+
+func (e *repoExecutor) Exec(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) (*command.Command, error) {
+ return e.gitCmdFactory.New(ctx, e.GitRepo, cmd, opts...)
+}
+
+func (e *repoExecutor) ExecAndWait(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) error {
+ command, err := e.Exec(ctx, cmd, opts...)
+ if err != nil {
+ return err
+ }
+ return command.Wait()
+}
+
func setupBatch(t *testing.T, ctx context.Context) (config.Cfg, Batch, *gitalypb.Repository) {
t.Helper()
cfg, repo, _ := testcfg.BuildWithRepo(t)
- repoExecutor := newRepoExecutor(t, cfg, repo)
+ repoExecutor := &repoExecutor{
+ GitRepo: repo, gitCmdFactory: git.NewExecCommandFactory(cfg),
+ }
cache := newCache(1*time.Hour, 1000, defaultEvictionInterval)
batch, err := cache.BatchProcess(ctx, repoExecutor)
@@ -327,14 +346,6 @@ func TestRepeatedCalls(t *testing.T) {
require.Equal(t, string(treeBytes), string(tree2))
}
-type failingTestRepoExecutor struct {
- git.RepositoryExecutor
-}
-
-func (e failingTestRepoExecutor) Exec(context.Context, git.Cmd, ...git.CmdOpt) (*command.Command, error) {
- return nil, fmt.Errorf("simulated error")
-}
-
func TestSpawnFailure(t *testing.T) {
cfg, testRepo, _ := testcfg.BuildWithRepo(t)
testRepoExecutor := &repoExecutor{
@@ -380,12 +391,10 @@ func TestSpawnFailure(t *testing.T) {
ctx2, cancel2 := testhelper.Context()
defer cancel2()
- failingTestRepoExecutor := failingTestRepoExecutor{
- RepositoryExecutor: testRepoExecutor,
- }
- _, err = catfileWithFreshSessionID(ctx2, cache, failingTestRepoExecutor)
+ cache.injectSpawnErrors = true
+ _, err = catfileWithFreshSessionID(ctx2, cache, testRepoExecutor)
require.Error(t, err, "expect simulated error")
- require.EqualError(t, err, "simulated error")
+ require.IsType(t, &simulatedBatchSpawnError{}, err)
require.True(
t,
diff --git a/internal/git/catfile/object.go b/internal/git/catfile/object.go
new file mode 100644
index 000000000..04363ad21
--- /dev/null
+++ b/internal/git/catfile/object.go
@@ -0,0 +1,13 @@
+package catfile
+
+import (
+ "io"
+)
+
+// Object represents data returned by `git cat-file --batch`
+type Object struct {
+ // ObjectInfo represents main information about object
+ ObjectInfo
+ // Reader provides raw data about object. It differs for each type of object(tag, commit, tree, log, etc.)
+ io.Reader
+}
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go
deleted file mode 100644
index 75d38c01e..000000000
--- a/internal/git/catfile/object_info_reader.go
+++ /dev/null
@@ -1,143 +0,0 @@
-package catfile
-
-import (
- "bufio"
- "context"
- "fmt"
- "strconv"
- "strings"
- "sync"
-
- "github.com/opentracing/opentracing-go"
- "github.com/prometheus/client_golang/prometheus"
- "gitlab.com/gitlab-org/gitaly/v14/internal/command"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git"
-)
-
-// ObjectInfo represents a header returned by `git cat-file --batch`
-type ObjectInfo struct {
- Oid git.ObjectID
- Type string
- Size int64
-}
-
-// NotFoundError is returned when requesting an object that does not exist.
-type NotFoundError struct{ error }
-
-// IsNotFound tests whether err has type NotFoundError.
-func IsNotFound(err error) bool {
- _, ok := err.(NotFoundError)
- return ok
-}
-
-// IsBlob returns true if object type is "blob"
-func (o *ObjectInfo) IsBlob() bool {
- return o.Type == "blob"
-}
-
-// ParseObjectInfo reads from a reader and parses the data into an ObjectInfo struct
-func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) {
- infoLine, err := stdout.ReadString('\n')
- if err != nil {
- return nil, fmt.Errorf("read info line: %w", err)
- }
-
- infoLine = strings.TrimSuffix(infoLine, "\n")
- if strings.HasSuffix(infoLine, " missing") {
- return nil, NotFoundError{fmt.Errorf("object not found")}
- }
-
- info := strings.Split(infoLine, " ")
- if len(info) != 3 {
- return nil, fmt.Errorf("invalid info line: %q", infoLine)
- }
-
- oid, err := git.NewObjectIDFromHex(info[0])
- if err != nil {
- return nil, fmt.Errorf("parse object ID: %w", err)
- }
-
- objectSize, err := strconv.ParseInt(info[2], 10, 64)
- if err != nil {
- return nil, fmt.Errorf("parse object size: %w", err)
- }
-
- return &ObjectInfo{
- Oid: oid,
- Type: info[1],
- Size: objectSize,
- }, nil
-}
-
-// objectInfoReader is a reader for Git object information. This reader is implemented via a
-// long-lived `git cat-file --batch-check` process such that we do not have to spawn a separate
-// process per object info we're about to read.
-type objectInfoReader struct {
- cmd *command.Command
- stdout *bufio.Reader
- sync.Mutex
-
- // creationCtx is the context in which this reader has been created. This context may
- // potentially be decorrelated from the "real" RPC context in case the reader is going to be
- // cached.
- creationCtx context.Context
- counter *prometheus.CounterVec
-}
-
-func newObjectInfoReader(
- ctx context.Context,
- repo git.RepositoryExecutor,
- counter *prometheus.CounterVec,
-) (*objectInfoReader, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectInfoReader")
-
- batchCmd, err := repo.Exec(ctx,
- git.SubCmd{
- Name: "cat-file",
- Flags: []git.Option{
- git.Flag{Name: "--batch-check"},
- },
- },
- git.WithStdin(command.SetupStdin),
- )
- if err != nil {
- return nil, err
- }
-
- objectInfoReader := &objectInfoReader{
- cmd: batchCmd,
- stdout: bufio.NewReader(batchCmd),
- creationCtx: ctx,
- counter: counter,
- }
- go func() {
- <-ctx.Done()
- // This is crucial to prevent leaking file descriptors.
- objectInfoReader.Close()
- span.Finish()
- }()
-
- return objectInfoReader, nil
-}
-
-func (o *objectInfoReader) Close() {
- _ = o.cmd.Wait()
-}
-
-func (o *objectInfoReader) info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
- finish := startSpan(o.creationCtx, ctx, "Batch.Info", revision)
- defer finish()
-
- if o.counter != nil {
- o.counter.WithLabelValues("info").Inc()
- }
-
- o.Lock()
- defer o.Unlock()
-
- if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
- return nil, err
- }
-
- return ParseObjectInfo(o.stdout)
-}
diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go
deleted file mode 100644
index e2c6776cc..000000000
--- a/internal/git/catfile/object_info_reader_test.go
+++ /dev/null
@@ -1,164 +0,0 @@
-package catfile
-
-import (
- "bufio"
- "fmt"
- "strings"
- "testing"
-
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/testutil"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
-)
-
-func TestParseObjectInfoSuccess(t *testing.T) {
- testCases := []struct {
- desc string
- input string
- output *ObjectInfo
- notFound bool
- }{
- {
- desc: "existing object",
- input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222\n",
- output: &ObjectInfo{
- Oid: "7c9373883988204e5a9f72c4a5119cbcefc83627",
- Type: "commit",
- Size: 222,
- },
- },
- {
- desc: "non existing object",
- input: "bla missing\n",
- notFound: true,
- },
- }
-
- for _, tc := range testCases {
- t.Run(tc.desc, func(t *testing.T) {
- reader := bufio.NewReader(strings.NewReader(tc.input))
- output, err := ParseObjectInfo(reader)
- if tc.notFound {
- require.True(t, IsNotFound(err), "expect NotFoundError")
- return
- }
-
- require.NoError(t, err)
- require.Equal(t, tc.output, output)
- })
- }
-}
-
-func TestParseObjectInfoErrors(t *testing.T) {
- testCases := []struct {
- desc string
- input string
- }{
- {desc: "missing newline", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222"},
- {desc: "too few words", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit\n"},
- {desc: "too many words", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222 bla\n"},
- {desc: "parse object size", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit bla\n"},
- }
-
- for _, tc := range testCases {
- t.Run(tc.desc, func(t *testing.T) {
- reader := bufio.NewReader(strings.NewReader(tc.input))
- _, err := ParseObjectInfo(reader)
-
- require.Error(t, err)
- })
- }
-}
-
-func TestObjectInfoReader(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
-
- oiByRevision := make(map[string]*ObjectInfo)
- for _, revision := range []string{
- "refs/heads/master",
- "refs/heads/master^{tree}",
- "refs/heads/master:README",
- "refs/tags/v1.1.1",
- } {
- revParseOutput := gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", revision)
- objectID, err := git.NewObjectIDFromHex(text.ChompBytes(revParseOutput))
- require.NoError(t, err)
-
- objectType := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-t", revision))
- objectContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", objectType, revision)
-
- oiByRevision[revision] = &ObjectInfo{
- Oid: objectID,
- Type: objectType,
- Size: int64(len(objectContents)),
- }
- }
-
- for _, tc := range []struct {
- desc string
- revision git.Revision
- expectedErr error
- expectedInfo *ObjectInfo
- }{
- {
- desc: "commit by ref",
- revision: "refs/heads/master",
- expectedInfo: oiByRevision["refs/heads/master"],
- },
- {
- desc: "commit by ID",
- revision: oiByRevision["refs/heads/master"].Oid.Revision(),
- expectedInfo: oiByRevision["refs/heads/master"],
- },
- {
- desc: "tree",
- revision: oiByRevision["refs/heads/master^{tree}"].Oid.Revision(),
- expectedInfo: oiByRevision["refs/heads/master^{tree}"],
- },
- {
- desc: "blob",
- revision: oiByRevision["refs/heads/master:README"].Oid.Revision(),
- expectedInfo: oiByRevision["refs/heads/master:README"],
- },
- {
- desc: "tag",
- revision: oiByRevision["refs/tags/v1.1.1"].Oid.Revision(),
- expectedInfo: oiByRevision["refs/tags/v1.1.1"],
- },
- {
- desc: "nonexistent ref",
- revision: "refs/heads/does-not-exist",
- expectedErr: NotFoundError{fmt.Errorf("object not found")},
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"})
-
- reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), counter)
- require.NoError(t, err)
-
- require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues("info")))
-
- info, err := reader.info(ctx, tc.revision)
- require.Equal(t, tc.expectedErr, err)
- require.Equal(t, tc.expectedInfo, info)
-
- require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues("info")))
-
- // Verify that we do another request no matter whether the previous call
- // succeeded or failed.
- _, err = reader.info(ctx, "refs/heads/master")
- require.NoError(t, err)
-
- require.Equal(t, float64(2), testutil.ToFloat64(counter.WithLabelValues("info")))
- })
- }
-}
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
deleted file mode 100644
index 4d272d041..000000000
--- a/internal/git/catfile/object_reader.go
+++ /dev/null
@@ -1,174 +0,0 @@
-package catfile
-
-import (
- "bufio"
- "context"
- "fmt"
- "io"
- "sync"
-
- "github.com/opentracing/opentracing-go"
- "github.com/prometheus/client_golang/prometheus"
- "gitlab.com/gitlab-org/gitaly/v14/internal/command"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git"
-)
-
-// Object represents data returned by `git cat-file --batch`
-type Object struct {
- // ObjectInfo represents main information about object
- ObjectInfo
- // Reader provides raw data about object. It differs for each type of object(tag, commit, tree, log, etc.)
- io.Reader
-}
-
-// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
-// --batch` process such that we do not have to spawn a new process for each object we are about to
-// read.
-type objectReader struct {
- cmd *command.Command
- stdout *bufio.Reader
-
- // n is a state machine that tracks how much data we still have to read
- // from r. Legal states are: n==0, this means we can do a new request on
- // the cat-file process. n==1, this means that we have to discard a
- // trailing newline. n>0, this means we are in the middle of reading a
- // raw git object.
- n int64
-
- // Even though the batch type should not be used concurrently, I think
- // that if that does happen by mistake we should give proper errors
- // instead of doing unsafe memory writes (to n) and failing in some
- // unpredictable way.
- sync.Mutex
-
- // creationCtx is the context in which this reader has been created. This context may
- // potentially be decorrelated from the "real" RPC context in case the reader is going to be
- // cached.
- creationCtx context.Context
- counter *prometheus.CounterVec
-}
-
-func newObjectReader(
- ctx context.Context,
- repo git.RepositoryExecutor,
- counter *prometheus.CounterVec,
-) (*objectReader, error) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectReader")
-
- batchCmd, err := repo.Exec(ctx,
- git.SubCmd{
- Name: "cat-file",
- Flags: []git.Option{
- git.Flag{Name: "--batch"},
- },
- },
- git.WithStdin(command.SetupStdin),
- )
- if err != nil {
- return nil, err
- }
-
- objectReader := &objectReader{
- cmd: batchCmd,
- stdout: bufio.NewReader(batchCmd),
- creationCtx: ctx,
- counter: counter,
- }
- go func() {
- <-ctx.Done()
- objectReader.Close()
- span.Finish()
- }()
-
- return objectReader, nil
-}
-
-func (o *objectReader) Close() {
- _ = o.cmd.Wait()
-}
-
-func (o *objectReader) reader(
- ctx context.Context,
- revision git.Revision,
- expectedType string,
-) (*Object, error) {
- finish := startSpan(o.creationCtx, ctx, fmt.Sprintf("Batch.Object(%s)", expectedType), revision)
- defer finish()
-
- if o.counter != nil {
- o.counter.WithLabelValues(expectedType).Inc()
- }
-
- o.Lock()
- defer o.Unlock()
-
- if o.n == 1 {
- // Consume linefeed
- if _, err := o.stdout.ReadByte(); err != nil {
- return nil, err
- }
- o.n--
- }
-
- if o.n != 0 {
- return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", o.n)
- }
-
- if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
- return nil, err
- }
-
- oi, err := ParseObjectInfo(o.stdout)
- if err != nil {
- return nil, err
- }
-
- o.n = oi.Size + 1
-
- if oi.Type != expectedType {
- // This is a programmer error and it should never happen. But if it does,
- // we need to leave the cat-file process in a good state
- if _, err := io.CopyN(io.Discard, o.stdout, o.n); err != nil {
- return nil, err
- }
- o.n = 0
-
- return nil, NotFoundError{error: fmt.Errorf("expected %s to be a %s, got %s", oi.Oid, expectedType, oi.Type)}
- }
-
- return &Object{
- ObjectInfo: *oi,
- Reader: &objectDataReader{
- objectReader: o,
- r: io.LimitReader(o.stdout, oi.Size),
- },
- }, nil
-}
-
-func (o *objectReader) consume(nBytes int) {
- o.n -= int64(nBytes)
- if o.n < 1 {
- panic("too many bytes read from batch")
- }
-}
-
-func (o *objectReader) hasUnreadData() bool {
- o.Lock()
- defer o.Unlock()
-
- return o.n > 1
-}
-
-type objectDataReader struct {
- *objectReader
- r io.Reader
-}
-
-func (o *objectDataReader) Read(p []byte) (int, error) {
- o.objectReader.Lock()
- defer o.objectReader.Unlock()
-
- n, err := o.r.Read(p)
- o.objectReader.consume(n)
- return n, err
-}
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
deleted file mode 100644
index 50f07c4e2..000000000
--- a/internal/git/catfile/object_reader_test.go
+++ /dev/null
@@ -1,137 +0,0 @@
-package catfile
-
-import (
- "fmt"
- "io"
- "testing"
-
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/testutil"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
-)
-
-func TestObjectReader_reader(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
-
- commitID, err := git.NewObjectIDFromHex(text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "refs/heads/master")))
- require.NoError(t, err)
- commitContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", "refs/heads/master")
-
- t.Run("read existing object by ref", func(t *testing.T) {
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- object, err := reader.reader(ctx, "refs/heads/master", "commit")
- require.NoError(t, err)
-
- data, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, commitContents, data)
- })
-
- t.Run("read existing object by object ID", func(t *testing.T) {
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- object, err := reader.reader(ctx, commitID.Revision(), "commit")
- require.NoError(t, err)
-
- data, err := io.ReadAll(object)
- require.NoError(t, err)
-
- require.Contains(t, string(data), "Merge branch 'cherry-pick-ce369011' into 'master'\n")
- })
-
- t.Run("read commit with wrong type", func(t *testing.T) {
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- _, err = reader.reader(ctx, commitID.Revision(), "tag")
- require.EqualError(t, err, fmt.Sprintf("expected %s to be a tag, got commit", commitID))
-
- // Verify that we're still able to read a commit after the previous read has failed.
- object, err := reader.reader(ctx, commitID.Revision(), "commit")
- require.NoError(t, err)
-
- data, err := io.ReadAll(object)
- require.NoError(t, err)
-
- require.Equal(t, commitContents, data)
- })
-
- t.Run("read missing ref", func(t *testing.T) {
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- _, err = reader.reader(ctx, "refs/heads/does-not-exist", "commit")
- require.EqualError(t, err, "object not found")
-
- // Verify that we're still able to read a commit after the previous read has failed.
- object, err := reader.reader(ctx, commitID.Revision(), "commit")
- require.NoError(t, err)
-
- data, err := io.ReadAll(object)
- require.NoError(t, err)
-
- require.Equal(t, commitContents, data)
- })
-
- t.Run("read fails when not consuming previous object", func(t *testing.T) {
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- _, err = reader.reader(ctx, commitID.Revision(), "commit")
- require.NoError(t, err)
-
- // We haven't yet consumed the previous object, so this must now fail.
- _, err = reader.reader(ctx, commitID.Revision(), "commit")
- require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1))
- })
-
- t.Run("read fails when partially consuming previous object", func(t *testing.T) {
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- object, err := reader.reader(ctx, commitID.Revision(), "commit")
- require.NoError(t, err)
-
- _, err = io.CopyN(io.Discard, object, 100)
- require.NoError(t, err)
-
- // We haven't yet consumed the previous object, so this must now fail.
- _, err = reader.reader(ctx, commitID.Revision(), "commit")
- require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1))
- })
-
- t.Run("read increments Prometheus counter", func(t *testing.T) {
- counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"})
-
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), counter)
- require.NoError(t, err)
-
- for objectType, revision := range map[string]git.Revision{
- "commit": "refs/heads/master",
- "tree": "refs/heads/master^{tree}",
- "blob": "refs/heads/master:README",
- "tag": "refs/tags/v1.1.1",
- } {
- require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues(objectType)))
-
- object, err := reader.reader(ctx, revision, objectType)
- require.NoError(t, err)
-
- require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues(objectType)))
-
- _, err = io.Copy(io.Discard, object)
- require.NoError(t, err)
- }
- })
-}
diff --git a/internal/git/catfile/objectinfo.go b/internal/git/catfile/objectinfo.go
new file mode 100644
index 000000000..b9b4ef609
--- /dev/null
+++ b/internal/git/catfile/objectinfo.go
@@ -0,0 +1,65 @@
+package catfile
+
+import (
+ "bufio"
+ "fmt"
+ "strconv"
+ "strings"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+)
+
+// ObjectInfo represents a header returned by `git cat-file --batch`
+type ObjectInfo struct {
+ Oid git.ObjectID
+ Type string
+ Size int64
+}
+
+// NotFoundError is returned when requesting an object that does not exist.
+type NotFoundError struct{ error }
+
+// IsNotFound tests whether err has type NotFoundError.
+func IsNotFound(err error) bool {
+ _, ok := err.(NotFoundError)
+ return ok
+}
+
+// IsBlob returns true if object type is "blob"
+func (o *ObjectInfo) IsBlob() bool {
+ return o.Type == "blob"
+}
+
+// ParseObjectInfo reads from a reader and parses the data into an ObjectInfo struct
+func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) {
+ infoLine, err := stdout.ReadString('\n')
+ if err != nil {
+ return nil, fmt.Errorf("read info line: %w", err)
+ }
+
+ infoLine = strings.TrimSuffix(infoLine, "\n")
+ if strings.HasSuffix(infoLine, " missing") {
+ return nil, NotFoundError{fmt.Errorf("object not found")}
+ }
+
+ info := strings.Split(infoLine, " ")
+ if len(info) != 3 {
+ return nil, fmt.Errorf("invalid info line: %q", infoLine)
+ }
+
+ oid, err := git.NewObjectIDFromHex(info[0])
+ if err != nil {
+ return nil, fmt.Errorf("parse object ID: %w", err)
+ }
+
+ objectSize, err := strconv.ParseInt(info[2], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("parse object size: %w", err)
+ }
+
+ return &ObjectInfo{
+ Oid: oid,
+ Type: info[1],
+ Size: objectSize,
+ }, nil
+}
diff --git a/internal/git/catfile/object_info_reader_fuzz.go b/internal/git/catfile/objectinfo_fuzz.go
index 130abb4e7..130abb4e7 100644
--- a/internal/git/catfile/object_info_reader_fuzz.go
+++ b/internal/git/catfile/objectinfo_fuzz.go
diff --git a/internal/git/catfile/objectinfo_test.go b/internal/git/catfile/objectinfo_test.go
new file mode 100644
index 000000000..46e9b39e2
--- /dev/null
+++ b/internal/git/catfile/objectinfo_test.go
@@ -0,0 +1,68 @@
+package catfile
+
+import (
+ "bufio"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestParseObjectInfoSuccess(t *testing.T) {
+ testCases := []struct {
+ desc string
+ input string
+ output *ObjectInfo
+ notFound bool
+ }{
+ {
+ desc: "existing object",
+ input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222\n",
+ output: &ObjectInfo{
+ Oid: "7c9373883988204e5a9f72c4a5119cbcefc83627",
+ Type: "commit",
+ Size: 222,
+ },
+ },
+ {
+ desc: "non existing object",
+ input: "bla missing\n",
+ notFound: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ reader := bufio.NewReader(strings.NewReader(tc.input))
+ output, err := ParseObjectInfo(reader)
+ if tc.notFound {
+ require.True(t, IsNotFound(err), "expect NotFoundError")
+ return
+ }
+
+ require.NoError(t, err)
+ require.Equal(t, tc.output, output)
+ })
+ }
+}
+
+func TestParseObjectInfoErrors(t *testing.T) {
+ testCases := []struct {
+ desc string
+ input string
+ }{
+ {desc: "missing newline", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222"},
+ {desc: "too few words", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit\n"},
+ {desc: "too many words", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222 bla\n"},
+ {desc: "parse object size", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit bla\n"},
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ reader := bufio.NewReader(strings.NewReader(tc.input))
+ _, err := ParseObjectInfo(reader)
+
+ require.Error(t, err)
+ })
+ }
+}
diff --git a/internal/git/catfile/testhelper_test.go b/internal/git/catfile/testhelper_test.go
index d78dc8092..f0fe098d2 100644
--- a/internal/git/catfile/testhelper_test.go
+++ b/internal/git/catfile/testhelper_test.go
@@ -1,14 +1,9 @@
package catfile
import (
- "context"
"os"
"testing"
- "gitlab.com/gitlab-org/gitaly/v14/internal/command"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository"
- "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
@@ -22,27 +17,3 @@ func testMain(m *testing.M) int {
defer cleanup()
return m.Run()
}
-
-type repoExecutor struct {
- repository.GitRepo
- gitCmdFactory git.CommandFactory
-}
-
-func newRepoExecutor(t *testing.T, cfg config.Cfg, repo repository.GitRepo) git.RepositoryExecutor {
- return &repoExecutor{
- GitRepo: repo,
- gitCmdFactory: git.NewExecCommandFactory(cfg),
- }
-}
-
-func (e *repoExecutor) Exec(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) (*command.Command, error) {
- return e.gitCmdFactory.New(ctx, e.GitRepo, cmd, opts...)
-}
-
-func (e *repoExecutor) ExecAndWait(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) error {
- command, err := e.Exec(ctx, cmd, opts...)
- if err != nil {
- return err
- }
- return command.Wait()
-}
diff --git a/internal/git/catfile/tracing.go b/internal/git/catfile/tracing.go
deleted file mode 100644
index 77a51304e..000000000
--- a/internal/git/catfile/tracing.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package catfile
-
-import (
- "context"
-
- "github.com/opentracing/opentracing-go"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git"
- "gitlab.com/gitlab-org/labkit/correlation"
-)
-
-func revisionTag(revision git.Revision) opentracing.Tag {
- return opentracing.Tag{Key: "revision", Value: revision}
-}
-
-func correlationIDTag(ctx context.Context) opentracing.Tag {
- return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)}
-}
-
-func startSpan(innerCtx context.Context, outerCtx context.Context, methodName string, revision git.Revision) func() {
- innerSpan, ctx := opentracing.StartSpanFromContext(innerCtx, methodName, revisionTag(revision))
- outerSpan, _ := opentracing.StartSpanFromContext(outerCtx, methodName, revisionTag(revision), correlationIDTag(ctx))
-
- return func() {
- innerSpan.Finish()
- outerSpan.Finish()
- }
-}