diff options
author | Stan Hu <stanhu@gmail.com> | 2021-09-21 19:13:46 +0300 |
---|---|---|
committer | Stan Hu <stanhu@gmail.com> | 2021-09-21 19:14:09 +0300 |
commit | c5a32bd5a681b448c5aae248a5adfe332037eaca (patch) | |
tree | f8d7d96fc71fb5faddda7d701b8b4e42689f68ca /internal | |
parent | f8946a7c774dc0a2cffec5ec3456024fca6d808c (diff) |
Revert catfile cache refactor
This reverts https://gitlab.com/gitlab-org/gitaly/-/merge_requests/3853
since we believe it's leading to a Goroutine leak in
https://gitlab.com/gitlab-com/gl-infra/production/-/issues/5566.
Diffstat (limited to 'internal')
-rw-r--r-- | internal/command/command.go | 10 | ||||
-rw-r--r-- | internal/git/catfile/batch.go | 220 | ||||
-rw-r--r-- | internal/git/catfile/batch_cache.go | 119 | ||||
-rw-r--r-- | internal/git/catfile/batch_cache_test.go | 278 | ||||
-rw-r--r-- | internal/git/catfile/batch_check_process.go | 67 | ||||
-rw-r--r-- | internal/git/catfile/batch_process.go | 148 | ||||
-rw-r--r-- | internal/git/catfile/batch_test.go | 39 | ||||
-rw-r--r-- | internal/git/catfile/object.go | 13 | ||||
-rw-r--r-- | internal/git/catfile/object_info_reader.go | 143 | ||||
-rw-r--r-- | internal/git/catfile/object_info_reader_test.go | 164 | ||||
-rw-r--r-- | internal/git/catfile/object_reader.go | 174 | ||||
-rw-r--r-- | internal/git/catfile/object_reader_test.go | 137 | ||||
-rw-r--r-- | internal/git/catfile/objectinfo.go | 65 | ||||
-rw-r--r-- | internal/git/catfile/objectinfo_fuzz.go (renamed from internal/git/catfile/object_info_reader_fuzz.go) | 0 | ||||
-rw-r--r-- | internal/git/catfile/objectinfo_test.go | 68 | ||||
-rw-r--r-- | internal/git/catfile/testhelper_test.go | 29 | ||||
-rw-r--r-- | internal/git/catfile/tracing.go | 27 |
17 files changed, 623 insertions, 1078 deletions
diff --git a/internal/command/command.go b/internal/command/command.go index d1db84cd1..eb2d494be 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -234,6 +234,7 @@ func New(ctx context.Context, cmd *exec.Cmd, stdin io.Reader, stdout, stderr io. syscall.Kill(-process.Pid, syscall.SIGTERM) } command.Wait() + wg.Done() }() logPid = cmd.Process.Pid @@ -276,15 +277,6 @@ func (c *Command) wait() { inFlightCommandGauge.Dec() c.logProcessComplete() - - // This is a bit out-of-place here given that the `wg.Add()` call is in `New()`. - // But in `New()`, we have to resort waiting on the context being finished until we - // would be able to decrement the number of in-flight commands. Given that in some - // cases we detach processes from their initial context such that they run in the - // background, this would cause us to take longer than necessary to decrease the - // wait group counter again. So we instead do it here to accelerate the process, - // even though it's less idiomatic. - wg.Done() } // ExitStatus will return the exit-code from an error returned by Wait(). diff --git a/internal/git/catfile/batch.go b/internal/git/catfile/batch.go index 5c11ad015..8bf739775 100644 --- a/internal/git/catfile/batch.go +++ b/internal/git/catfile/batch.go @@ -7,6 +7,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/labkit/correlation" ) const ( @@ -28,69 +29,17 @@ type Batch interface { } type batch struct { - *objectInfoReader - *objectReader - - closedMutex sync.Mutex - closed bool -} - -func newBatch( - ctx context.Context, - repo git.RepositoryExecutor, - counter *prometheus.CounterVec, -) (_ *batch, returnedErr error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch") - go func() { - <-ctx.Done() - span.Finish() - }() - - objectReader, err := newObjectReader(ctx, repo, counter) - if err != nil { - return nil, err - } - defer func() { - // If creation of the ObjectInfoReader fails, then we do not want to leak the - // ObjectReader process. - if returnedErr != nil { - objectReader.Close() - } - }() - - objectInfoReader, err := newObjectInfoReader(ctx, repo, counter) - if err != nil { - return nil, err - } - - return &batch{objectReader: objectReader, objectInfoReader: objectInfoReader}, nil -} - -// Close closes the writers for objectInfoReader and objectReader. This is only used for cached -// Batches -func (c *batch) Close() { - c.closedMutex.Lock() - defer c.closedMutex.Unlock() - - if c.closed { - return - } - - c.closed = true - c.objectReader.Close() - c.objectInfoReader.Close() -} - -func (c *batch) isClosed() bool { - c.closedMutex.Lock() - defer c.closedMutex.Unlock() - return c.closed + sync.Mutex + *batchCheckProcess + *batchProcess + cancel func() + closed bool } // Info returns an ObjectInfo if spec exists. If the revision does not exist // the error is of type NotFoundError. func (c *batch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) { - return c.objectInfoReader.info(ctx, revision) + return c.batchCheckProcess.info(revision) } // Tree returns a raw tree object. It is an error if the revision does not @@ -98,7 +47,7 @@ func (c *batch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, e // the object type. Caller must consume the Reader before making another call // on C. func (c *batch) Tree(ctx context.Context, revision git.Revision) (*Object, error) { - return c.objectReader.reader(ctx, revision, "tree") + return c.batchProcess.reader(revision, "tree") } // Commit returns a raw commit object. It is an error if the revision does not @@ -106,7 +55,7 @@ func (c *batch) Tree(ctx context.Context, revision git.Revision) (*Object, error // check the object type. Caller must consume the Reader before making another // call on C. func (c *batch) Commit(ctx context.Context, revision git.Revision) (*Object, error) { - return c.objectReader.reader(ctx, revision, "commit") + return c.batchProcess.reader(revision, "commit") } // Blob returns a reader for the requested blob. The entire blob must be @@ -115,11 +64,158 @@ func (c *batch) Commit(ctx context.Context, revision git.Revision) (*Object, err // It is an error if the revision does not point to a blob. To prevent this, // use Info to resolve the revision and check the object type. func (c *batch) Blob(ctx context.Context, revision git.Revision) (*Object, error) { - return c.objectReader.reader(ctx, revision, "blob") + return c.batchProcess.reader(revision, "blob") } // Tag returns a raw tag object. Caller must consume the Reader before // making another call on C. func (c *batch) Tag(ctx context.Context, revision git.Revision) (*Object, error) { - return c.objectReader.reader(ctx, revision, "tag") + return c.batchProcess.reader(revision, "tag") +} + +// Close closes the writers for batchCheckProcess and batchProcess. This is only used for cached +// Batches +func (c *batch) Close() { + c.Lock() + defer c.Unlock() + + if c.closed { + return + } + + c.closed = true + if c.cancel != nil { + // both c.batchProcess and c.batchCheckProcess have goroutines that listen on + // ctx.Done() when this is cancelled, it will cause those goroutines to close both + // writers + c.cancel() + } +} + +func (c *batch) isClosed() bool { + c.Lock() + defer c.Unlock() + return c.closed +} + +type simulatedBatchSpawnError struct{} + +func (simulatedBatchSpawnError) Error() string { return "simulated spawn error" } + +func (bc *BatchCache) newBatch(ctx context.Context, repo git.RepositoryExecutor) (*batch, context.Context, error) { + var err error + + // batch processes are long-lived and reused across RPCs, + // so we de-correlate the process from the RPC + ctx = correlation.ContextWithCorrelation(ctx, "") + ctx = opentracing.ContextWithSpan(ctx, nil) + span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch") + + ctx, cancel := context.WithCancel(ctx) + defer func() { + if err != nil { + cancel() + } + }() + + go func() { + <-ctx.Done() + span.Finish() + }() + + batchProcess, err := bc.newBatchProcess(ctx, repo) + if err != nil { + return nil, ctx, err + } + + batchCheckProcess, err := bc.newBatchCheckProcess(ctx, repo) + if err != nil { + return nil, ctx, err + } + + return &batch{batchProcess: batchProcess, batchCheckProcess: batchCheckProcess}, ctx, nil +} + +func newInstrumentedBatch(ctx context.Context, c Batch, catfileLookupCounter *prometheus.CounterVec) Batch { + return &instrumentedBatch{ + Batch: c, + catfileLookupCounter: catfileLookupCounter, + batchCtx: ctx, + } +} + +// We maintain two contexts here: the one RPC-level one, and one batch-level one. +// +// The batchCtx tracks the lifetime of the long-running batch process, and is +// de-correlated from the RPC, as it is shared between many RPCs. +// +// We perform double accounting and re-correlation to get stats and traces per +// batch process. +type instrumentedBatch struct { + Batch + catfileLookupCounter *prometheus.CounterVec + batchCtx context.Context +} + +func (ib *instrumentedBatch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) { + ctx, finish := ib.startSpan(ctx, "Batch.Info", revision) + defer finish() + + ib.catfileLookupCounter.WithLabelValues("info").Inc() + + return ib.Batch.Info(ctx, revision) +} + +func (ib *instrumentedBatch) Tree(ctx context.Context, revision git.Revision) (*Object, error) { + ctx, finish := ib.startSpan(ctx, "Batch.Tree", revision) + defer finish() + + ib.catfileLookupCounter.WithLabelValues("tree").Inc() + + return ib.Batch.Tree(ctx, revision) +} + +func (ib *instrumentedBatch) Commit(ctx context.Context, revision git.Revision) (*Object, error) { + ctx, finish := ib.startSpan(ctx, "Batch.Commit", revision) + defer finish() + + ib.catfileLookupCounter.WithLabelValues("commit").Inc() + + return ib.Batch.Commit(ctx, revision) +} + +func (ib *instrumentedBatch) Blob(ctx context.Context, revision git.Revision) (*Object, error) { + ctx, finish := ib.startSpan(ctx, "Batch.Blob", revision) + defer finish() + + ib.catfileLookupCounter.WithLabelValues("blob").Inc() + + return ib.Batch.Blob(ctx, revision) +} + +func (ib *instrumentedBatch) Tag(ctx context.Context, revision git.Revision) (*Object, error) { + ctx, finish := ib.startSpan(ctx, "Batch.Tag", revision) + defer finish() + + ib.catfileLookupCounter.WithLabelValues("tag").Inc() + + return ib.Batch.Tag(ctx, revision) +} + +func (ib *instrumentedBatch) revisionTag(revision git.Revision) opentracing.Tag { + return opentracing.Tag{Key: "revision", Value: revision} +} + +func (ib *instrumentedBatch) correlationIDTag(ctx context.Context) opentracing.Tag { + return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)} +} + +func (ib *instrumentedBatch) startSpan(ctx context.Context, methodName string, revision git.Revision) (context.Context, func()) { + span, ctx := opentracing.StartSpanFromContext(ctx, methodName, ib.revisionTag(revision)) + span2, _ := opentracing.StartSpanFromContext(ib.batchCtx, methodName, ib.revisionTag(revision), ib.correlationIDTag(ctx)) + + return ctx, func() { + span.Finish() + span2.Finish() + } } diff --git a/internal/git/catfile/batch_cache.go b/internal/git/catfile/batch_cache.go index b8e1f966a..336d9db3d 100644 --- a/internal/git/catfile/batch_cache.go +++ b/internal/git/catfile/batch_cache.go @@ -6,13 +6,11 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" - "gitlab.com/gitlab-org/labkit/correlation" ) const ( @@ -34,18 +32,14 @@ type Cache interface { Evict() } -func newCacheKey(sessionID string, repo repository.GitRepo) (key, bool) { - if sessionID == "" { - return key{}, false - } - +func newCacheKey(sessionID string, repo repository.GitRepo) key { return key{ sessionID: sessionID, repoStorage: repo.GetStorageName(), repoRelPath: repo.GetRelativePath(), repoObjDir: repo.GetGitObjectDirectory(), repoAltDir: strings.Join(repo.GetGitAlternateObjectDirectories(), ","), - }, true + } } type key struct { @@ -71,6 +65,9 @@ type BatchCache struct { maxLen int // ttl is the fixed ttl for cache entries ttl time.Duration + // injectSpawnErrors is used for testing purposes only. If set to true, then spawned batch + // processes will simulate spawn errors. + injectSpawnErrors bool // monitorTicker is the ticker used for the monitoring Goroutine. monitorTicker *time.Ticker monitorDone chan interface{} @@ -83,11 +80,6 @@ type BatchCache struct { entriesMutex sync.Mutex entries []*entry - - // cachedProcessDone is a condition that gets signalled whenever a process is being - // considered to be returned to the cache. This field is optional and must only be used in - // tests. - cachedProcessDone *sync.Cond } // NewCache creates a new catfile process cache. @@ -95,6 +87,15 @@ func NewCache(cfg config.Cfg) *BatchCache { return newCache(defaultBatchfileTTL, cfg.Git.CatfileCacheSize, defaultEvictionInterval) } +// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called +// once. +func (bc *BatchCache) Stop() { + bc.monitorTicker.Stop() + bc.monitorDone <- struct{}{} + <-bc.monitorDone + bc.Evict() +} + func newCache(ttl time.Duration, maxLen int, refreshInterval time.Duration) *BatchCache { if maxLen <= 0 { maxLen = defaultMaxLen @@ -169,91 +170,47 @@ func (bc *BatchCache) monitor() { } } -// Stop stops the monitoring Goroutine and evicts all cached processes. This must only be called -// once. -func (bc *BatchCache) Stop() { - bc.monitorTicker.Stop() - bc.monitorDone <- struct{}{} - <-bc.monitorDone - bc.Evict() -} - // BatchProcess creates a new Batch process for the given repository. -func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecutor) (_ Batch, returnedErr error) { - requestDone := ctx.Done() - if requestDone == nil { +func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecutor) (Batch, error) { + if ctx.Done() == nil { panic("empty ctx.Done() in catfile.Batch.New()") } - cacheKey, isCacheable := newCacheKey(metadata.GetValue(ctx, SessionIDField), repo) - if isCacheable { - // We only try to look up cached batch processes in case it is cacheable, which - // requires a session ID. This is mostly done such that git-cat-file(1) processes - // from one user cannot interfer with those from another user. The main intent is to - // disallow trivial denial of service attacks against other users in case it is - // possible to poison the cache with broken git-cat-file(1) processes. - - if c, ok := bc.checkout(cacheKey); ok { - go bc.returnWhenDone(requestDone, cacheKey, c) - return c, nil + sessionID := metadata.GetValue(ctx, SessionIDField) + if sessionID == "" { + c, ctx, err := bc.newBatch(ctx, repo) + if err != nil { + return nil, err } + return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), err + } - // We have not found any cached process, so we need to create a new one. In this - // case, we need to detach the process from the current context such that it does - // not get killed when the current context is done. Note that while we explicitly - // `Close()` processes in case this function fails, we must have a cancellable - // context or otherwise our `command` package will panic. - var cancel func() - ctx, cancel = context.WithCancel(context.Background()) - defer func() { - if returnedErr != nil { - cancel() - } - }() - - // We have to decorrelate the process from the current context given that it - // may potentially be reused across different RPC calls. - ctx = correlation.ContextWithCorrelation(ctx, "") - ctx = opentracing.ContextWithSpan(ctx, nil) + cacheKey := newCacheKey(sessionID, repo) + requestDone := ctx.Done() + + if c, ok := bc.checkout(cacheKey); ok { + go bc.returnWhenDone(requestDone, cacheKey, c) + return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil } - c, err := newBatch(ctx, repo, bc.catfileLookupCounter) + // if we are using caching, create a fresh context for the new batch + // and initialize the new batch with a bc key and cancel function + cacheCtx, cacheCancel := context.WithCancel(context.Background()) + c, ctx, err := bc.newBatch(cacheCtx, repo) if err != nil { + cacheCancel() return nil, err } - defer func() { - // If we somehow fail after creating a new Batch process, then we want to kill - // spawned processes right away. - if returnedErr != nil { - c.Close() - } - }() - - bc.totalCatfileProcesses.Inc() - bc.currentCatfileProcesses.Inc() - go func() { - <-ctx.Done() - bc.currentCatfileProcesses.Dec() - }() - - if isCacheable { - // If the process is cacheable, then we want to put the process into the cache when - // the current outer context is done. - go bc.returnWhenDone(requestDone, cacheKey, c) - } - return c, nil + c.cancel = cacheCancel + go bc.returnWhenDone(requestDone, cacheKey, c) + + return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil } func (bc *BatchCache) returnWhenDone(done <-chan struct{}, cacheKey key, c *batch) { <-done - if bc.cachedProcessDone != nil { - defer func() { - bc.cachedProcessDone.Broadcast() - }() - } - if c == nil || c.isClosed() { return } diff --git a/internal/git/catfile/batch_cache_test.go b/internal/git/catfile/batch_cache_test.go index 9816c0d7d..885978a7c 100644 --- a/internal/git/catfile/batch_cache_test.go +++ b/internal/git/catfile/batch_cache_test.go @@ -1,87 +1,70 @@ package catfile import ( - "context" - "errors" - "io" - "os" - "sync" + "fmt" "testing" "time" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" - "gitlab.com/gitlab-org/labkit/correlation" - "google.golang.org/grpc/metadata" ) -func TestCache_add(t *testing.T) { +func TestCacheAdd(t *testing.T) { const maxLen = 3 bc := newCache(time.Hour, maxLen, defaultEvictionInterval) - cfg, repo, _ := testcfg.BuildWithRepo(t) - - key0 := mustCreateKey(t, "0", repo) - value0 := mustCreateBatch(t, cfg, repo) + key0 := testKey(0) + value0 := testValue() bc.add(key0, value0) requireCacheValid(t, bc) - key1 := mustCreateKey(t, "1", repo) - bc.add(key1, mustCreateBatch(t, cfg, repo)) + key1 := testKey(1) + bc.add(key1, testValue()) requireCacheValid(t, bc) - key2 := mustCreateKey(t, "2", repo) - bc.add(key2, mustCreateBatch(t, cfg, repo)) + key2 := testKey(2) + bc.add(key2, testValue()) requireCacheValid(t, bc) // Because maxLen is 3, and key0 is oldest, we expect that adding key3 // will kick out key0. - key3 := mustCreateKey(t, "3", repo) - bc.add(key3, mustCreateBatch(t, cfg, repo)) + key3 := testKey(3) + bc.add(key3, testValue()) requireCacheValid(t, bc) require.Equal(t, maxLen, bc.len(), "length should be maxLen") require.True(t, value0.isClosed(), "value0 should be closed") - require.Equal(t, []key{key1, key2, key3}, keys(t, bc)) + require.Equal(t, []key{key1, key2, key3}, keys(bc)) } -func TestCache_addTwice(t *testing.T) { +func TestCacheAddTwice(t *testing.T) { bc := newCache(time.Hour, 10, defaultEvictionInterval) - cfg, repo, _ := testcfg.BuildWithRepo(t) - - key0 := mustCreateKey(t, "0", repo) - value0 := mustCreateBatch(t, cfg, repo) + key0 := testKey(0) + value0 := testValue() bc.add(key0, value0) requireCacheValid(t, bc) - key1 := mustCreateKey(t, "1", repo) - value1 := mustCreateBatch(t, cfg, repo) - bc.add(key1, value1) + key1 := testKey(1) + bc.add(key1, testValue()) requireCacheValid(t, bc) require.Equal(t, key0, bc.head().key, "key0 should be oldest key") - value2 := mustCreateBatch(t, cfg, repo) + value2 := testValue() bc.add(key0, value2) requireCacheValid(t, bc) require.Equal(t, key1, bc.head().key, "key1 should be oldest key") - require.Equal(t, value1, bc.head().value) + require.Equal(t, value2, bc.head().value) require.True(t, value0.isClosed(), "value0 should be closed") } -func TestCache_checkout(t *testing.T) { +func TestCacheCheckout(t *testing.T) { bc := newCache(time.Hour, 10, defaultEvictionInterval) - cfg, repo, _ := testcfg.BuildWithRepo(t) - - key0 := mustCreateKey(t, "0", repo) - value0 := mustCreateBatch(t, cfg, repo) + key0 := testKey(0) + value0 := testValue() bc.add(key0, value0) v, ok := bc.checkout(key{sessionID: "foo"}) @@ -102,33 +85,31 @@ func TestCache_checkout(t *testing.T) { require.Nil(t, v, "value from second checkout") } -func TestCache_enforceTTL(t *testing.T) { +func TestCacheEnforceTTL(t *testing.T) { ttl := time.Hour bc := newCache(ttl, 10, defaultEvictionInterval) - cfg, repo, _ := testcfg.BuildWithRepo(t) - sleep := func() { time.Sleep(2 * time.Millisecond) } - key0 := mustCreateKey(t, "0", repo) - value0 := mustCreateBatch(t, cfg, repo) + key0 := testKey(0) + value0 := testValue() bc.add(key0, value0) sleep() - key1 := mustCreateKey(t, "1", repo) - value1 := mustCreateBatch(t, cfg, repo) + key1 := testKey(1) + value1 := testValue() bc.add(key1, value1) sleep() cutoff := time.Now().Add(ttl) sleep() - key2 := mustCreateKey(t, "2", repo) - bc.add(key2, mustCreateBatch(t, cfg, repo)) + key2 := testKey(2) + bc.add(key2, testValue()) sleep() - key3 := mustCreateKey(t, "3", repo) - bc.add(key3, mustCreateBatch(t, cfg, repo)) + key3 := testKey(3) + bc.add(key3, testValue()) sleep() requireCacheValid(t, bc) @@ -142,198 +123,40 @@ func TestCache_enforceTTL(t *testing.T) { require.True(t, v.isClosed(), "value %d %v should be closed", i, v) } - require.Equal(t, []key{key2, key3}, keys(t, bc), "remaining keys after EnforceTTL") + require.Equal(t, []key{key2, key3}, keys(bc), "remaining keys after EnforceTTL") bc.enforceTTL(cutoff) requireCacheValid(t, bc) - require.Equal(t, []key{key2, key3}, keys(t, bc), "remaining keys after second EnforceTTL") + require.Equal(t, []key{key2, key3}, keys(bc), "remaining keys after second EnforceTTL") } -func TestCache_autoExpiry(t *testing.T) { +func TestAutoExpiry(t *testing.T) { ttl := 5 * time.Millisecond refresh := 1 * time.Millisecond bc := newCache(ttl, 10, refresh) - cfg, repo, _ := testcfg.BuildWithRepo(t) - - key0 := mustCreateKey(t, "0", repo) - value0 := mustCreateBatch(t, cfg, repo) + key0 := testKey(0) + value0 := testValue() bc.add(key0, value0) requireCacheValid(t, bc) - require.Contains(t, keys(t, bc), key0, "key should still be in map") + require.Contains(t, keys(bc), key0, "key should still be in map") require.False(t, value0.isClosed(), "value should not have been closed") // Wait for the monitor goroutine to do its thing for i := 0; i < 100; i++ { - if len(keys(t, bc)) == 0 { + if len(keys(bc)) == 0 { break } time.Sleep(refresh) } - require.Empty(t, keys(t, bc), "key should no longer be in map") + require.Empty(t, keys(bc), "key should no longer be in map") require.True(t, value0.isClosed(), "value should be closed after eviction") } -func TestCache_BatchProcess(t *testing.T) { - cfg, repo, _ := testcfg.BuildWithRepo(t) - repoExecutor := newRepoExecutor(t, cfg, repo) - - cache := newCache(time.Hour, 10, time.Hour) - defer cache.Evict() - cache.cachedProcessDone = sync.NewCond(&sync.Mutex{}) - - t.Run("uncancellable", func(t *testing.T) { - ctx := context.Background() - - require.PanicsWithValue(t, "empty ctx.Done() in catfile.Batch.New()", func() { - _, _ = cache.BatchProcess(ctx, repoExecutor) - }) - }) - - t.Run("uncacheable", func(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - ctx = correlation.ContextWithCorrelation(ctx, "1") - - // The context doesn't carry a session ID and is thus uncacheable. - // The process should never get returned to the cache and must be - // killed on context cancellation. - batchProcess, err := cache.BatchProcess(ctx, repoExecutor) - require.NoError(t, err) - - batch, ok := batchProcess.(*batch) - require.True(t, ok, "expected batch") - - correlation := correlation.ExtractFromContext(batch.objectReader.creationCtx) - require.Equal(t, "1", correlation) - - cancel() - - // We're cheating a bit here to avoid creating a racy test by reaching into the - // batch processes and trying to read from their stdout. If the cancel did kill the - // process as expected, then the stdout should be closed and we'll get an EOF. - for _, reader := range []io.Reader{batch.objectInfoReader.cmd, batch.objectReader.cmd} { - output, err := io.ReadAll(reader) - if err != nil { - require.True(t, errors.Is(err, os.ErrClosed)) - } else { - require.NoError(t, err) - } - require.Empty(t, output) - } - - // This is another bug: while we do not have any resource leaks because processes - // got killed as expected, the batch itself is not considered to have been closed. - require.False(t, batch.isClosed()) - - require.Empty(t, keys(t, cache)) - }) - - t.Run("cacheable", func(t *testing.T) { - defer cache.Evict() - - ctx, cancel := testhelper.Context() - defer cancel() - ctx = correlation.ContextWithCorrelation(ctx, "1") - ctx = testhelper.MergeIncomingMetadata(ctx, - metadata.Pairs(SessionIDField, "1"), - ) - - batchProcess, err := cache.BatchProcess(ctx, repoExecutor) - require.NoError(t, err) - - batch, ok := batchProcess.(*batch) - require.True(t, ok, "expected instrumented batch") - - // The correlation ID must be empty given that this will be a cached long-running - // processes that can be reused across multpile RPC calls. - correlation := correlation.ExtractFromContext(batch.objectReader.creationCtx) - require.Empty(t, correlation) - - // Cancel the context such that the process will be considered for return to the - // cache and wait for the cache to collect it. - cache.cachedProcessDone.L.Lock() - cancel() - defer cache.cachedProcessDone.L.Unlock() - cache.cachedProcessDone.Wait() - - keys := keys(t, cache) - require.Equal(t, []key{{ - sessionID: "1", - repoStorage: repo.GetStorageName(), - repoRelPath: repo.GetRelativePath(), - }}, keys) - - // Assert that we can still read from the cached process. - _, err = batchProcess.Info(ctx, "refs/heads/master") - require.NoError(t, err) - }) - - t.Run("dirty process does not get cached", func(t *testing.T) { - defer cache.Evict() - - ctx, cancel := testhelper.Context() - defer cancel() - ctx = testhelper.MergeIncomingMetadata(ctx, - metadata.Pairs(SessionIDField, "1"), - ) - - batchProcess, err := cache.BatchProcess(ctx, repoExecutor) - require.NoError(t, err) - - // While we request object data, we do not consume it at all. The reader is thus - // dirty and cannot be reused and shouldn't be returned to the cache. - _, err = batchProcess.Commit(ctx, "refs/heads/master") - require.NoError(t, err) - - // Cancel the context such that the process will be considered for return to the - // cache and wait for the cache to collect it. - cache.cachedProcessDone.L.Lock() - cancel() - defer cache.cachedProcessDone.L.Unlock() - cache.cachedProcessDone.Wait() - - require.Empty(t, keys(t, cache)) - - // The process should be killed now. - _, err = batchProcess.Info(ctx, "refs/heads/master") - require.True(t, errors.Is(err, os.ErrClosed)) - }) - - t.Run("closed process does not get cached", func(t *testing.T) { - defer cache.Evict() - - ctx, cancel := testhelper.Context() - defer cancel() - ctx = testhelper.MergeIncomingMetadata(ctx, - metadata.Pairs(SessionIDField, "1"), - ) - - batchProcess, err := cache.BatchProcess(ctx, repoExecutor) - require.NoError(t, err) - - batch, ok := batchProcess.(*batch) - require.True(t, ok, "expected batch") - - // Closed processes naturally cannot be reused anymore and thus shouldn't ever get - // cached. - batch.Close() - - // Cancel the context such that the process will be considered for return to the - // cache and wait for the cache to collect it. - cache.cachedProcessDone.L.Lock() - cancel() - defer cache.cachedProcessDone.L.Unlock() - cache.cachedProcessDone.Wait() - - require.Empty(t, keys(t, cache)) - }) -} - func requireCacheValid(t *testing.T, bc *BatchCache) { bc.entriesMutex.Lock() defer bc.entriesMutex.Unlock() @@ -344,30 +167,11 @@ func requireCacheValid(t *testing.T, bc *BatchCache) { } } -func mustCreateBatch(t *testing.T, cfg config.Cfg, repo repository.GitRepo) *batch { - t.Helper() - - ctx, cancel := testhelper.Context() - t.Cleanup(cancel) - - batch, err := newBatch(ctx, newRepoExecutor(t, cfg, repo), nil) - require.NoError(t, err) - - return batch -} - -func mustCreateKey(t *testing.T, sessionID string, repo repository.GitRepo) key { - t.Helper() - - key, cacheable := newCacheKey(sessionID, repo) - require.True(t, cacheable) - - return key -} +func testValue() *batch { return &batch{} } -func keys(t *testing.T, bc *BatchCache) []key { - t.Helper() +func testKey(i int) key { return key{sessionID: fmt.Sprintf("key-%d", i)} } +func keys(bc *BatchCache) []key { bc.entriesMutex.Lock() defer bc.entriesMutex.Unlock() diff --git a/internal/git/catfile/batch_check_process.go b/internal/git/catfile/batch_check_process.go new file mode 100644 index 000000000..2ff5256e5 --- /dev/null +++ b/internal/git/catfile/batch_check_process.go @@ -0,0 +1,67 @@ +package catfile + +import ( + "bufio" + "context" + "fmt" + "io" + "sync" + + "github.com/opentracing/opentracing-go" + "gitlab.com/gitlab-org/gitaly/v14/internal/git" +) + +// batchCheckProcess encapsulates a 'git cat-file --batch-check' process +type batchCheckProcess struct { + r *bufio.Reader + w io.WriteCloser + sync.Mutex +} + +func (bc *BatchCache) newBatchCheckProcess(ctx context.Context, repo git.RepositoryExecutor) (*batchCheckProcess, error) { + process := &batchCheckProcess{} + + var stdinReader io.Reader + stdinReader, process.w = io.Pipe() + + span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchCheckProcess") + + batchCmd, err := repo.Exec(ctx, + git.SubCmd{ + Name: "cat-file", + Flags: []git.Option{ + git.Flag{Name: "--batch-check"}, + }, + }, + git.WithStdin(stdinReader), + ) + if err != nil { + return nil, err + } + + process.r = bufio.NewReader(batchCmd) + go func() { + <-ctx.Done() + // This is crucial to prevent leaking file descriptors. + process.w.Close() + span.Finish() + }() + + if bc.injectSpawnErrors { + // Testing only: intentionally leak process + return nil, &simulatedBatchSpawnError{} + } + + return process, nil +} + +func (bc *batchCheckProcess) info(revision git.Revision) (*ObjectInfo, error) { + bc.Lock() + defer bc.Unlock() + + if _, err := fmt.Fprintln(bc.w, revision.String()); err != nil { + return nil, err + } + + return ParseObjectInfo(bc.r) +} diff --git a/internal/git/catfile/batch_process.go b/internal/git/catfile/batch_process.go new file mode 100644 index 000000000..30a79c963 --- /dev/null +++ b/internal/git/catfile/batch_process.go @@ -0,0 +1,148 @@ +package catfile + +import ( + "bufio" + "context" + "fmt" + "io" + "io/ioutil" + "sync" + + "github.com/opentracing/opentracing-go" + "gitlab.com/gitlab-org/gitaly/v14/internal/git" +) + +// batch encapsulates a 'git cat-file --batch' process +type batchProcess struct { + r *bufio.Reader + w io.WriteCloser + + // n is a state machine that tracks how much data we still have to read + // from r. Legal states are: n==0, this means we can do a new request on + // the cat-file process. n==1, this means that we have to discard a + // trailing newline. n>0, this means we are in the middle of reading a + // raw git object. + n int64 + + // Even though the batch type should not be used concurrently, I think + // that if that does happen by mistake we should give proper errors + // instead of doing unsafe memory writes (to n) and failing in some + // unpredictable way. + sync.Mutex +} + +func (bc *BatchCache) newBatchProcess(ctx context.Context, repo git.RepositoryExecutor) (*batchProcess, error) { + bc.totalCatfileProcesses.Inc() + b := &batchProcess{} + + var stdinReader io.Reader + stdinReader, b.w = io.Pipe() + + span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchProcess") + + batchCmd, err := repo.Exec(ctx, + git.SubCmd{ + Name: "cat-file", + Flags: []git.Option{ + git.Flag{Name: "--batch"}, + }, + }, + git.WithStdin(stdinReader), + ) + if err != nil { + return nil, err + } + + b.r = bufio.NewReader(batchCmd) + + bc.currentCatfileProcesses.Inc() + go func() { + <-ctx.Done() + // This Close() is crucial to prevent leaking file descriptors. + b.w.Close() + bc.currentCatfileProcesses.Dec() + span.Finish() + }() + + if bc.injectSpawnErrors { + // Testing only: intentionally leak process + return nil, &simulatedBatchSpawnError{} + } + + return b, nil +} + +func (b *batchProcess) reader(revision git.Revision, expectedType string) (*Object, error) { + b.Lock() + defer b.Unlock() + + if b.n == 1 { + // Consume linefeed + if _, err := b.r.ReadByte(); err != nil { + return nil, err + } + b.n-- + } + + if b.n != 0 { + return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", b.n) + } + + if _, err := fmt.Fprintln(b.w, revision.String()); err != nil { + return nil, err + } + + oi, err := ParseObjectInfo(b.r) + if err != nil { + return nil, err + } + + b.n = oi.Size + 1 + + if oi.Type != expectedType { + // This is a programmer error and it should never happen. But if it does, + // we need to leave the cat-file process in a good state + if _, err := io.CopyN(ioutil.Discard, b.r, b.n); err != nil { + return nil, err + } + b.n = 0 + + return nil, NotFoundError{error: fmt.Errorf("expected %s to be a %s, got %s", oi.Oid, expectedType, oi.Type)} + } + + return &Object{ + ObjectInfo: *oi, + Reader: &batchReader{ + batchProcess: b, + r: io.LimitReader(b.r, oi.Size), + }, + }, nil +} + +func (b *batchProcess) consume(nBytes int) { + b.n -= int64(nBytes) + if b.n < 1 { + panic("too many bytes read from batch") + } +} + +func (b *batchProcess) hasUnreadData() bool { + b.Lock() + defer b.Unlock() + + return b.n > 1 +} + +type batchReader struct { + *batchProcess + r io.Reader +} + +func (br *batchReader) Read(p []byte) (int, error) { + br.batchProcess.Lock() + defer br.batchProcess.Unlock() + + n, err := br.r.Read(p) + br.batchProcess.consume(n) + return n, err +} diff --git a/internal/git/catfile/batch_test.go b/internal/git/catfile/batch_test.go index 95e62cb96..19710e900 100644 --- a/internal/git/catfile/batch_test.go +++ b/internal/git/catfile/batch_test.go @@ -3,7 +3,6 @@ package catfile import ( "bytes" "context" - "fmt" "io" "os" "os/exec" @@ -14,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/command" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -22,11 +22,30 @@ import ( "google.golang.org/grpc/metadata" ) +type repoExecutor struct { + repository.GitRepo + gitCmdFactory git.CommandFactory +} + +func (e *repoExecutor) Exec(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) (*command.Command, error) { + return e.gitCmdFactory.New(ctx, e.GitRepo, cmd, opts...) +} + +func (e *repoExecutor) ExecAndWait(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) error { + command, err := e.Exec(ctx, cmd, opts...) + if err != nil { + return err + } + return command.Wait() +} + func setupBatch(t *testing.T, ctx context.Context) (config.Cfg, Batch, *gitalypb.Repository) { t.Helper() cfg, repo, _ := testcfg.BuildWithRepo(t) - repoExecutor := newRepoExecutor(t, cfg, repo) + repoExecutor := &repoExecutor{ + GitRepo: repo, gitCmdFactory: git.NewExecCommandFactory(cfg), + } cache := newCache(1*time.Hour, 1000, defaultEvictionInterval) batch, err := cache.BatchProcess(ctx, repoExecutor) @@ -327,14 +346,6 @@ func TestRepeatedCalls(t *testing.T) { require.Equal(t, string(treeBytes), string(tree2)) } -type failingTestRepoExecutor struct { - git.RepositoryExecutor -} - -func (e failingTestRepoExecutor) Exec(context.Context, git.Cmd, ...git.CmdOpt) (*command.Command, error) { - return nil, fmt.Errorf("simulated error") -} - func TestSpawnFailure(t *testing.T) { cfg, testRepo, _ := testcfg.BuildWithRepo(t) testRepoExecutor := &repoExecutor{ @@ -380,12 +391,10 @@ func TestSpawnFailure(t *testing.T) { ctx2, cancel2 := testhelper.Context() defer cancel2() - failingTestRepoExecutor := failingTestRepoExecutor{ - RepositoryExecutor: testRepoExecutor, - } - _, err = catfileWithFreshSessionID(ctx2, cache, failingTestRepoExecutor) + cache.injectSpawnErrors = true + _, err = catfileWithFreshSessionID(ctx2, cache, testRepoExecutor) require.Error(t, err, "expect simulated error") - require.EqualError(t, err, "simulated error") + require.IsType(t, &simulatedBatchSpawnError{}, err) require.True( t, diff --git a/internal/git/catfile/object.go b/internal/git/catfile/object.go new file mode 100644 index 000000000..04363ad21 --- /dev/null +++ b/internal/git/catfile/object.go @@ -0,0 +1,13 @@ +package catfile + +import ( + "io" +) + +// Object represents data returned by `git cat-file --batch` +type Object struct { + // ObjectInfo represents main information about object + ObjectInfo + // Reader provides raw data about object. It differs for each type of object(tag, commit, tree, log, etc.) + io.Reader +} diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go deleted file mode 100644 index 75d38c01e..000000000 --- a/internal/git/catfile/object_info_reader.go +++ /dev/null @@ -1,143 +0,0 @@ -package catfile - -import ( - "bufio" - "context" - "fmt" - "strconv" - "strings" - "sync" - - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/v14/internal/command" - "gitlab.com/gitlab-org/gitaly/v14/internal/git" -) - -// ObjectInfo represents a header returned by `git cat-file --batch` -type ObjectInfo struct { - Oid git.ObjectID - Type string - Size int64 -} - -// NotFoundError is returned when requesting an object that does not exist. -type NotFoundError struct{ error } - -// IsNotFound tests whether err has type NotFoundError. -func IsNotFound(err error) bool { - _, ok := err.(NotFoundError) - return ok -} - -// IsBlob returns true if object type is "blob" -func (o *ObjectInfo) IsBlob() bool { - return o.Type == "blob" -} - -// ParseObjectInfo reads from a reader and parses the data into an ObjectInfo struct -func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) { - infoLine, err := stdout.ReadString('\n') - if err != nil { - return nil, fmt.Errorf("read info line: %w", err) - } - - infoLine = strings.TrimSuffix(infoLine, "\n") - if strings.HasSuffix(infoLine, " missing") { - return nil, NotFoundError{fmt.Errorf("object not found")} - } - - info := strings.Split(infoLine, " ") - if len(info) != 3 { - return nil, fmt.Errorf("invalid info line: %q", infoLine) - } - - oid, err := git.NewObjectIDFromHex(info[0]) - if err != nil { - return nil, fmt.Errorf("parse object ID: %w", err) - } - - objectSize, err := strconv.ParseInt(info[2], 10, 64) - if err != nil { - return nil, fmt.Errorf("parse object size: %w", err) - } - - return &ObjectInfo{ - Oid: oid, - Type: info[1], - Size: objectSize, - }, nil -} - -// objectInfoReader is a reader for Git object information. This reader is implemented via a -// long-lived `git cat-file --batch-check` process such that we do not have to spawn a separate -// process per object info we're about to read. -type objectInfoReader struct { - cmd *command.Command - stdout *bufio.Reader - sync.Mutex - - // creationCtx is the context in which this reader has been created. This context may - // potentially be decorrelated from the "real" RPC context in case the reader is going to be - // cached. - creationCtx context.Context - counter *prometheus.CounterVec -} - -func newObjectInfoReader( - ctx context.Context, - repo git.RepositoryExecutor, - counter *prometheus.CounterVec, -) (*objectInfoReader, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectInfoReader") - - batchCmd, err := repo.Exec(ctx, - git.SubCmd{ - Name: "cat-file", - Flags: []git.Option{ - git.Flag{Name: "--batch-check"}, - }, - }, - git.WithStdin(command.SetupStdin), - ) - if err != nil { - return nil, err - } - - objectInfoReader := &objectInfoReader{ - cmd: batchCmd, - stdout: bufio.NewReader(batchCmd), - creationCtx: ctx, - counter: counter, - } - go func() { - <-ctx.Done() - // This is crucial to prevent leaking file descriptors. - objectInfoReader.Close() - span.Finish() - }() - - return objectInfoReader, nil -} - -func (o *objectInfoReader) Close() { - _ = o.cmd.Wait() -} - -func (o *objectInfoReader) info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) { - finish := startSpan(o.creationCtx, ctx, "Batch.Info", revision) - defer finish() - - if o.counter != nil { - o.counter.WithLabelValues("info").Inc() - } - - o.Lock() - defer o.Unlock() - - if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil { - return nil, err - } - - return ParseObjectInfo(o.stdout) -} diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go deleted file mode 100644 index e2c6776cc..000000000 --- a/internal/git/catfile/object_info_reader_test.go +++ /dev/null @@ -1,164 +0,0 @@ -package catfile - -import ( - "bufio" - "fmt" - "strings" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/git" - "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" - "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" -) - -func TestParseObjectInfoSuccess(t *testing.T) { - testCases := []struct { - desc string - input string - output *ObjectInfo - notFound bool - }{ - { - desc: "existing object", - input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222\n", - output: &ObjectInfo{ - Oid: "7c9373883988204e5a9f72c4a5119cbcefc83627", - Type: "commit", - Size: 222, - }, - }, - { - desc: "non existing object", - input: "bla missing\n", - notFound: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - reader := bufio.NewReader(strings.NewReader(tc.input)) - output, err := ParseObjectInfo(reader) - if tc.notFound { - require.True(t, IsNotFound(err), "expect NotFoundError") - return - } - - require.NoError(t, err) - require.Equal(t, tc.output, output) - }) - } -} - -func TestParseObjectInfoErrors(t *testing.T) { - testCases := []struct { - desc string - input string - }{ - {desc: "missing newline", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222"}, - {desc: "too few words", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit\n"}, - {desc: "too many words", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222 bla\n"}, - {desc: "parse object size", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit bla\n"}, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - reader := bufio.NewReader(strings.NewReader(tc.input)) - _, err := ParseObjectInfo(reader) - - require.Error(t, err) - }) - } -} - -func TestObjectInfoReader(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - cfg, repoProto, repoPath := testcfg.BuildWithRepo(t) - - oiByRevision := make(map[string]*ObjectInfo) - for _, revision := range []string{ - "refs/heads/master", - "refs/heads/master^{tree}", - "refs/heads/master:README", - "refs/tags/v1.1.1", - } { - revParseOutput := gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", revision) - objectID, err := git.NewObjectIDFromHex(text.ChompBytes(revParseOutput)) - require.NoError(t, err) - - objectType := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-t", revision)) - objectContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", objectType, revision) - - oiByRevision[revision] = &ObjectInfo{ - Oid: objectID, - Type: objectType, - Size: int64(len(objectContents)), - } - } - - for _, tc := range []struct { - desc string - revision git.Revision - expectedErr error - expectedInfo *ObjectInfo - }{ - { - desc: "commit by ref", - revision: "refs/heads/master", - expectedInfo: oiByRevision["refs/heads/master"], - }, - { - desc: "commit by ID", - revision: oiByRevision["refs/heads/master"].Oid.Revision(), - expectedInfo: oiByRevision["refs/heads/master"], - }, - { - desc: "tree", - revision: oiByRevision["refs/heads/master^{tree}"].Oid.Revision(), - expectedInfo: oiByRevision["refs/heads/master^{tree}"], - }, - { - desc: "blob", - revision: oiByRevision["refs/heads/master:README"].Oid.Revision(), - expectedInfo: oiByRevision["refs/heads/master:README"], - }, - { - desc: "tag", - revision: oiByRevision["refs/tags/v1.1.1"].Oid.Revision(), - expectedInfo: oiByRevision["refs/tags/v1.1.1"], - }, - { - desc: "nonexistent ref", - revision: "refs/heads/does-not-exist", - expectedErr: NotFoundError{fmt.Errorf("object not found")}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"}) - - reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), counter) - require.NoError(t, err) - - require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues("info"))) - - info, err := reader.info(ctx, tc.revision) - require.Equal(t, tc.expectedErr, err) - require.Equal(t, tc.expectedInfo, info) - - require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues("info"))) - - // Verify that we do another request no matter whether the previous call - // succeeded or failed. - _, err = reader.info(ctx, "refs/heads/master") - require.NoError(t, err) - - require.Equal(t, float64(2), testutil.ToFloat64(counter.WithLabelValues("info"))) - }) - } -} diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go deleted file mode 100644 index 4d272d041..000000000 --- a/internal/git/catfile/object_reader.go +++ /dev/null @@ -1,174 +0,0 @@ -package catfile - -import ( - "bufio" - "context" - "fmt" - "io" - "sync" - - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/v14/internal/command" - "gitlab.com/gitlab-org/gitaly/v14/internal/git" -) - -// Object represents data returned by `git cat-file --batch` -type Object struct { - // ObjectInfo represents main information about object - ObjectInfo - // Reader provides raw data about object. It differs for each type of object(tag, commit, tree, log, etc.) - io.Reader -} - -// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file -// --batch` process such that we do not have to spawn a new process for each object we are about to -// read. -type objectReader struct { - cmd *command.Command - stdout *bufio.Reader - - // n is a state machine that tracks how much data we still have to read - // from r. Legal states are: n==0, this means we can do a new request on - // the cat-file process. n==1, this means that we have to discard a - // trailing newline. n>0, this means we are in the middle of reading a - // raw git object. - n int64 - - // Even though the batch type should not be used concurrently, I think - // that if that does happen by mistake we should give proper errors - // instead of doing unsafe memory writes (to n) and failing in some - // unpredictable way. - sync.Mutex - - // creationCtx is the context in which this reader has been created. This context may - // potentially be decorrelated from the "real" RPC context in case the reader is going to be - // cached. - creationCtx context.Context - counter *prometheus.CounterVec -} - -func newObjectReader( - ctx context.Context, - repo git.RepositoryExecutor, - counter *prometheus.CounterVec, -) (*objectReader, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.ObjectReader") - - batchCmd, err := repo.Exec(ctx, - git.SubCmd{ - Name: "cat-file", - Flags: []git.Option{ - git.Flag{Name: "--batch"}, - }, - }, - git.WithStdin(command.SetupStdin), - ) - if err != nil { - return nil, err - } - - objectReader := &objectReader{ - cmd: batchCmd, - stdout: bufio.NewReader(batchCmd), - creationCtx: ctx, - counter: counter, - } - go func() { - <-ctx.Done() - objectReader.Close() - span.Finish() - }() - - return objectReader, nil -} - -func (o *objectReader) Close() { - _ = o.cmd.Wait() -} - -func (o *objectReader) reader( - ctx context.Context, - revision git.Revision, - expectedType string, -) (*Object, error) { - finish := startSpan(o.creationCtx, ctx, fmt.Sprintf("Batch.Object(%s)", expectedType), revision) - defer finish() - - if o.counter != nil { - o.counter.WithLabelValues(expectedType).Inc() - } - - o.Lock() - defer o.Unlock() - - if o.n == 1 { - // Consume linefeed - if _, err := o.stdout.ReadByte(); err != nil { - return nil, err - } - o.n-- - } - - if o.n != 0 { - return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", o.n) - } - - if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil { - return nil, err - } - - oi, err := ParseObjectInfo(o.stdout) - if err != nil { - return nil, err - } - - o.n = oi.Size + 1 - - if oi.Type != expectedType { - // This is a programmer error and it should never happen. But if it does, - // we need to leave the cat-file process in a good state - if _, err := io.CopyN(io.Discard, o.stdout, o.n); err != nil { - return nil, err - } - o.n = 0 - - return nil, NotFoundError{error: fmt.Errorf("expected %s to be a %s, got %s", oi.Oid, expectedType, oi.Type)} - } - - return &Object{ - ObjectInfo: *oi, - Reader: &objectDataReader{ - objectReader: o, - r: io.LimitReader(o.stdout, oi.Size), - }, - }, nil -} - -func (o *objectReader) consume(nBytes int) { - o.n -= int64(nBytes) - if o.n < 1 { - panic("too many bytes read from batch") - } -} - -func (o *objectReader) hasUnreadData() bool { - o.Lock() - defer o.Unlock() - - return o.n > 1 -} - -type objectDataReader struct { - *objectReader - r io.Reader -} - -func (o *objectDataReader) Read(p []byte) (int, error) { - o.objectReader.Lock() - defer o.objectReader.Unlock() - - n, err := o.r.Read(p) - o.objectReader.consume(n) - return n, err -} diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go deleted file mode 100644 index 50f07c4e2..000000000 --- a/internal/git/catfile/object_reader_test.go +++ /dev/null @@ -1,137 +0,0 @@ -package catfile - -import ( - "fmt" - "io" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/internal/git" - "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" - "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" -) - -func TestObjectReader_reader(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - cfg, repoProto, repoPath := testcfg.BuildWithRepo(t) - - commitID, err := git.NewObjectIDFromHex(text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "refs/heads/master"))) - require.NoError(t, err) - commitContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", "refs/heads/master") - - t.Run("read existing object by ref", func(t *testing.T) { - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - object, err := reader.reader(ctx, "refs/heads/master", "commit") - require.NoError(t, err) - - data, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, commitContents, data) - }) - - t.Run("read existing object by object ID", func(t *testing.T) { - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - object, err := reader.reader(ctx, commitID.Revision(), "commit") - require.NoError(t, err) - - data, err := io.ReadAll(object) - require.NoError(t, err) - - require.Contains(t, string(data), "Merge branch 'cherry-pick-ce369011' into 'master'\n") - }) - - t.Run("read commit with wrong type", func(t *testing.T) { - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - _, err = reader.reader(ctx, commitID.Revision(), "tag") - require.EqualError(t, err, fmt.Sprintf("expected %s to be a tag, got commit", commitID)) - - // Verify that we're still able to read a commit after the previous read has failed. - object, err := reader.reader(ctx, commitID.Revision(), "commit") - require.NoError(t, err) - - data, err := io.ReadAll(object) - require.NoError(t, err) - - require.Equal(t, commitContents, data) - }) - - t.Run("read missing ref", func(t *testing.T) { - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - _, err = reader.reader(ctx, "refs/heads/does-not-exist", "commit") - require.EqualError(t, err, "object not found") - - // Verify that we're still able to read a commit after the previous read has failed. - object, err := reader.reader(ctx, commitID.Revision(), "commit") - require.NoError(t, err) - - data, err := io.ReadAll(object) - require.NoError(t, err) - - require.Equal(t, commitContents, data) - }) - - t.Run("read fails when not consuming previous object", func(t *testing.T) { - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - _, err = reader.reader(ctx, commitID.Revision(), "commit") - require.NoError(t, err) - - // We haven't yet consumed the previous object, so this must now fail. - _, err = reader.reader(ctx, commitID.Revision(), "commit") - require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1)) - }) - - t.Run("read fails when partially consuming previous object", func(t *testing.T) { - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - object, err := reader.reader(ctx, commitID.Revision(), "commit") - require.NoError(t, err) - - _, err = io.CopyN(io.Discard, object, 100) - require.NoError(t, err) - - // We haven't yet consumed the previous object, so this must now fail. - _, err = reader.reader(ctx, commitID.Revision(), "commit") - require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1)) - }) - - t.Run("read increments Prometheus counter", func(t *testing.T) { - counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"}) - - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), counter) - require.NoError(t, err) - - for objectType, revision := range map[string]git.Revision{ - "commit": "refs/heads/master", - "tree": "refs/heads/master^{tree}", - "blob": "refs/heads/master:README", - "tag": "refs/tags/v1.1.1", - } { - require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues(objectType))) - - object, err := reader.reader(ctx, revision, objectType) - require.NoError(t, err) - - require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues(objectType))) - - _, err = io.Copy(io.Discard, object) - require.NoError(t, err) - } - }) -} diff --git a/internal/git/catfile/objectinfo.go b/internal/git/catfile/objectinfo.go new file mode 100644 index 000000000..b9b4ef609 --- /dev/null +++ b/internal/git/catfile/objectinfo.go @@ -0,0 +1,65 @@ +package catfile + +import ( + "bufio" + "fmt" + "strconv" + "strings" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git" +) + +// ObjectInfo represents a header returned by `git cat-file --batch` +type ObjectInfo struct { + Oid git.ObjectID + Type string + Size int64 +} + +// NotFoundError is returned when requesting an object that does not exist. +type NotFoundError struct{ error } + +// IsNotFound tests whether err has type NotFoundError. +func IsNotFound(err error) bool { + _, ok := err.(NotFoundError) + return ok +} + +// IsBlob returns true if object type is "blob" +func (o *ObjectInfo) IsBlob() bool { + return o.Type == "blob" +} + +// ParseObjectInfo reads from a reader and parses the data into an ObjectInfo struct +func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) { + infoLine, err := stdout.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("read info line: %w", err) + } + + infoLine = strings.TrimSuffix(infoLine, "\n") + if strings.HasSuffix(infoLine, " missing") { + return nil, NotFoundError{fmt.Errorf("object not found")} + } + + info := strings.Split(infoLine, " ") + if len(info) != 3 { + return nil, fmt.Errorf("invalid info line: %q", infoLine) + } + + oid, err := git.NewObjectIDFromHex(info[0]) + if err != nil { + return nil, fmt.Errorf("parse object ID: %w", err) + } + + objectSize, err := strconv.ParseInt(info[2], 10, 64) + if err != nil { + return nil, fmt.Errorf("parse object size: %w", err) + } + + return &ObjectInfo{ + Oid: oid, + Type: info[1], + Size: objectSize, + }, nil +} diff --git a/internal/git/catfile/object_info_reader_fuzz.go b/internal/git/catfile/objectinfo_fuzz.go index 130abb4e7..130abb4e7 100644 --- a/internal/git/catfile/object_info_reader_fuzz.go +++ b/internal/git/catfile/objectinfo_fuzz.go diff --git a/internal/git/catfile/objectinfo_test.go b/internal/git/catfile/objectinfo_test.go new file mode 100644 index 000000000..46e9b39e2 --- /dev/null +++ b/internal/git/catfile/objectinfo_test.go @@ -0,0 +1,68 @@ +package catfile + +import ( + "bufio" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseObjectInfoSuccess(t *testing.T) { + testCases := []struct { + desc string + input string + output *ObjectInfo + notFound bool + }{ + { + desc: "existing object", + input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222\n", + output: &ObjectInfo{ + Oid: "7c9373883988204e5a9f72c4a5119cbcefc83627", + Type: "commit", + Size: 222, + }, + }, + { + desc: "non existing object", + input: "bla missing\n", + notFound: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + reader := bufio.NewReader(strings.NewReader(tc.input)) + output, err := ParseObjectInfo(reader) + if tc.notFound { + require.True(t, IsNotFound(err), "expect NotFoundError") + return + } + + require.NoError(t, err) + require.Equal(t, tc.output, output) + }) + } +} + +func TestParseObjectInfoErrors(t *testing.T) { + testCases := []struct { + desc string + input string + }{ + {desc: "missing newline", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222"}, + {desc: "too few words", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit\n"}, + {desc: "too many words", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit 222 bla\n"}, + {desc: "parse object size", input: "7c9373883988204e5a9f72c4a5119cbcefc83627 commit bla\n"}, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + reader := bufio.NewReader(strings.NewReader(tc.input)) + _, err := ParseObjectInfo(reader) + + require.Error(t, err) + }) + } +} diff --git a/internal/git/catfile/testhelper_test.go b/internal/git/catfile/testhelper_test.go index d78dc8092..f0fe098d2 100644 --- a/internal/git/catfile/testhelper_test.go +++ b/internal/git/catfile/testhelper_test.go @@ -1,14 +1,9 @@ package catfile import ( - "context" "os" "testing" - "gitlab.com/gitlab-org/gitaly/v14/internal/command" - "gitlab.com/gitlab-org/gitaly/v14/internal/git" - "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" ) @@ -22,27 +17,3 @@ func testMain(m *testing.M) int { defer cleanup() return m.Run() } - -type repoExecutor struct { - repository.GitRepo - gitCmdFactory git.CommandFactory -} - -func newRepoExecutor(t *testing.T, cfg config.Cfg, repo repository.GitRepo) git.RepositoryExecutor { - return &repoExecutor{ - GitRepo: repo, - gitCmdFactory: git.NewExecCommandFactory(cfg), - } -} - -func (e *repoExecutor) Exec(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) (*command.Command, error) { - return e.gitCmdFactory.New(ctx, e.GitRepo, cmd, opts...) -} - -func (e *repoExecutor) ExecAndWait(ctx context.Context, cmd git.Cmd, opts ...git.CmdOpt) error { - command, err := e.Exec(ctx, cmd, opts...) - if err != nil { - return err - } - return command.Wait() -} diff --git a/internal/git/catfile/tracing.go b/internal/git/catfile/tracing.go deleted file mode 100644 index 77a51304e..000000000 --- a/internal/git/catfile/tracing.go +++ /dev/null @@ -1,27 +0,0 @@ -package catfile - -import ( - "context" - - "github.com/opentracing/opentracing-go" - "gitlab.com/gitlab-org/gitaly/v14/internal/git" - "gitlab.com/gitlab-org/labkit/correlation" -) - -func revisionTag(revision git.Revision) opentracing.Tag { - return opentracing.Tag{Key: "revision", Value: revision} -} - -func correlationIDTag(ctx context.Context) opentracing.Tag { - return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)} -} - -func startSpan(innerCtx context.Context, outerCtx context.Context, methodName string, revision git.Revision) func() { - innerSpan, ctx := opentracing.StartSpanFromContext(innerCtx, methodName, revisionTag(revision)) - outerSpan, _ := opentracing.StartSpanFromContext(outerCtx, methodName, revisionTag(revision), correlationIDTag(ctx)) - - return func() { - innerSpan.Finish() - outerSpan.Finish() - } -} |