diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-23 12:10:45 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-23 12:10:45 +0300 |
commit | 287bcaf9697fc6d29e66dc22bbba97b290aa5042 (patch) | |
tree | 611732487c7eb474ca74179da6031e51955e842c | |
parent | bb39fb2c6f620cbcf86d408cf4b853e478c6705f (diff) | |
parent | 261c3a776526ec8739bfaa774626ff03e633a512 (diff) |
Merge branch 'tracing-catfile-batch' into 'master'
tracing: Improve instrumentation of catfile.Batch
See merge request gitlab-org/gitaly!3688
-rw-r--r-- | internal/git/catfile/batch.go | 76 | ||||
-rw-r--r-- | internal/git/catfile/batch_cache.go | 10 | ||||
-rw-r--r-- | internal/git/catfile/batch_check_process.go | 7 | ||||
-rw-r--r-- | internal/git/catfile/batch_process.go | 7 |
4 files changed, 69 insertions, 31 deletions
diff --git a/internal/git/catfile/batch.go b/internal/git/catfile/batch.go index 32445283f..8bf739775 100644 --- a/internal/git/catfile/batch.go +++ b/internal/git/catfile/batch.go @@ -7,6 +7,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/labkit/correlation" ) const ( @@ -101,7 +102,15 @@ type simulatedBatchSpawnError struct{} func (simulatedBatchSpawnError) Error() string { return "simulated spawn error" } -func (bc *BatchCache) newBatch(ctx context.Context, repo git.RepositoryExecutor) (_ *batch, err error) { +func (bc *BatchCache) newBatch(ctx context.Context, repo git.RepositoryExecutor) (*batch, context.Context, error) { + var err error + + // batch processes are long-lived and reused across RPCs, + // so we de-correlate the process from the RPC + ctx = correlation.ContextWithCorrelation(ctx, "") + ctx = opentracing.ContextWithSpan(ctx, nil) + span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.Batch") + ctx, cancel := context.WithCancel(ctx) defer func() { if err != nil { @@ -109,31 +118,48 @@ func (bc *BatchCache) newBatch(ctx context.Context, repo git.RepositoryExecutor) } }() + go func() { + <-ctx.Done() + span.Finish() + }() + batchProcess, err := bc.newBatchProcess(ctx, repo) if err != nil { - return nil, err + return nil, ctx, err } batchCheckProcess, err := bc.newBatchCheckProcess(ctx, repo) if err != nil { - return nil, err + return nil, ctx, err } - return &batch{batchProcess: batchProcess, batchCheckProcess: batchCheckProcess}, nil + return &batch{batchProcess: batchProcess, batchCheckProcess: batchCheckProcess}, ctx, nil } -func newInstrumentedBatch(c Batch, catfileLookupCounter *prometheus.CounterVec) Batch { - return &instrumentedBatch{c, catfileLookupCounter} +func newInstrumentedBatch(ctx context.Context, c Batch, catfileLookupCounter *prometheus.CounterVec) Batch { + return &instrumentedBatch{ + Batch: c, + catfileLookupCounter: catfileLookupCounter, + batchCtx: ctx, + } } +// We maintain two contexts here: the one RPC-level one, and one batch-level one. +// +// The batchCtx tracks the lifetime of the long-running batch process, and is +// de-correlated from the RPC, as it is shared between many RPCs. +// +// We perform double accounting and re-correlation to get stats and traces per +// batch process. type instrumentedBatch struct { Batch catfileLookupCounter *prometheus.CounterVec + batchCtx context.Context } func (ib *instrumentedBatch) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Batch.Info", opentracing.Tag{Key: "revision", Value: revision}) - defer span.Finish() + ctx, finish := ib.startSpan(ctx, "Batch.Info", revision) + defer finish() ib.catfileLookupCounter.WithLabelValues("info").Inc() @@ -141,8 +167,8 @@ func (ib *instrumentedBatch) Info(ctx context.Context, revision git.Revision) (* } func (ib *instrumentedBatch) Tree(ctx context.Context, revision git.Revision) (*Object, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Batch.Tree", opentracing.Tag{Key: "revision", Value: revision}) - defer span.Finish() + ctx, finish := ib.startSpan(ctx, "Batch.Tree", revision) + defer finish() ib.catfileLookupCounter.WithLabelValues("tree").Inc() @@ -150,8 +176,8 @@ func (ib *instrumentedBatch) Tree(ctx context.Context, revision git.Revision) (* } func (ib *instrumentedBatch) Commit(ctx context.Context, revision git.Revision) (*Object, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Batch.Commit", opentracing.Tag{Key: "revision", Value: revision}) - defer span.Finish() + ctx, finish := ib.startSpan(ctx, "Batch.Commit", revision) + defer finish() ib.catfileLookupCounter.WithLabelValues("commit").Inc() @@ -159,8 +185,8 @@ func (ib *instrumentedBatch) Commit(ctx context.Context, revision git.Revision) } func (ib *instrumentedBatch) Blob(ctx context.Context, revision git.Revision) (*Object, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Batch.Blob", opentracing.Tag{Key: "revision", Value: revision}) - defer span.Finish() + ctx, finish := ib.startSpan(ctx, "Batch.Blob", revision) + defer finish() ib.catfileLookupCounter.WithLabelValues("blob").Inc() @@ -168,10 +194,28 @@ func (ib *instrumentedBatch) Blob(ctx context.Context, revision git.Revision) (* } func (ib *instrumentedBatch) Tag(ctx context.Context, revision git.Revision) (*Object, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "Batch.Tag", opentracing.Tag{Key: "revision", Value: revision}) - defer span.Finish() + ctx, finish := ib.startSpan(ctx, "Batch.Tag", revision) + defer finish() ib.catfileLookupCounter.WithLabelValues("tag").Inc() return ib.Batch.Tag(ctx, revision) } + +func (ib *instrumentedBatch) revisionTag(revision git.Revision) opentracing.Tag { + return opentracing.Tag{Key: "revision", Value: revision} +} + +func (ib *instrumentedBatch) correlationIDTag(ctx context.Context) opentracing.Tag { + return opentracing.Tag{Key: "correlation_id", Value: correlation.ExtractFromContext(ctx)} +} + +func (ib *instrumentedBatch) startSpan(ctx context.Context, methodName string, revision git.Revision) (context.Context, func()) { + span, ctx := opentracing.StartSpanFromContext(ctx, methodName, ib.revisionTag(revision)) + span2, _ := opentracing.StartSpanFromContext(ib.batchCtx, methodName, ib.revisionTag(revision), ib.correlationIDTag(ctx)) + + return ctx, func() { + span.Finish() + span2.Finish() + } +} diff --git a/internal/git/catfile/batch_cache.go b/internal/git/catfile/batch_cache.go index a9c2bddc6..336d9db3d 100644 --- a/internal/git/catfile/batch_cache.go +++ b/internal/git/catfile/batch_cache.go @@ -178,11 +178,11 @@ func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecu sessionID := metadata.GetValue(ctx, SessionIDField) if sessionID == "" { - c, err := bc.newBatch(ctx, repo) + c, ctx, err := bc.newBatch(ctx, repo) if err != nil { return nil, err } - return newInstrumentedBatch(c, bc.catfileLookupCounter), err + return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), err } cacheKey := newCacheKey(sessionID, repo) @@ -190,13 +190,13 @@ func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecu if c, ok := bc.checkout(cacheKey); ok { go bc.returnWhenDone(requestDone, cacheKey, c) - return newInstrumentedBatch(c, bc.catfileLookupCounter), nil + return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil } // if we are using caching, create a fresh context for the new batch // and initialize the new batch with a bc key and cancel function cacheCtx, cacheCancel := context.WithCancel(context.Background()) - c, err := bc.newBatch(cacheCtx, repo) + c, ctx, err := bc.newBatch(cacheCtx, repo) if err != nil { cacheCancel() return nil, err @@ -205,7 +205,7 @@ func (bc *BatchCache) BatchProcess(ctx context.Context, repo git.RepositoryExecu c.cancel = cacheCancel go bc.returnWhenDone(requestDone, cacheKey, c) - return newInstrumentedBatch(c, bc.catfileLookupCounter), nil + return newInstrumentedBatch(ctx, c, bc.catfileLookupCounter), nil } func (bc *BatchCache) returnWhenDone(done <-chan struct{}, cacheKey key, c *batch) { diff --git a/internal/git/catfile/batch_check_process.go b/internal/git/catfile/batch_check_process.go index b4cbc8675..2ff5256e5 100644 --- a/internal/git/catfile/batch_check_process.go +++ b/internal/git/catfile/batch_check_process.go @@ -9,7 +9,6 @@ import ( "github.com/opentracing/opentracing-go" "gitlab.com/gitlab-org/gitaly/v14/internal/git" - "gitlab.com/gitlab-org/labkit/correlation" ) // batchCheckProcess encapsulates a 'git cat-file --batch-check' process @@ -25,10 +24,7 @@ func (bc *BatchCache) newBatchCheckProcess(ctx context.Context, repo git.Reposit var stdinReader io.Reader stdinReader, process.w = io.Pipe() - // batch processes are long-lived and reused across RPCs, - // so we de-correlate the process from the RPC - ctx = correlation.ContextWithCorrelation(ctx, "") - ctx = opentracing.ContextWithSpan(ctx, nil) + span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchCheckProcess") batchCmd, err := repo.Exec(ctx, git.SubCmd{ @@ -48,6 +44,7 @@ func (bc *BatchCache) newBatchCheckProcess(ctx context.Context, repo git.Reposit <-ctx.Done() // This is crucial to prevent leaking file descriptors. process.w.Close() + span.Finish() }() if bc.injectSpawnErrors { diff --git a/internal/git/catfile/batch_process.go b/internal/git/catfile/batch_process.go index 243dbc658..30a79c963 100644 --- a/internal/git/catfile/batch_process.go +++ b/internal/git/catfile/batch_process.go @@ -10,7 +10,6 @@ import ( "github.com/opentracing/opentracing-go" "gitlab.com/gitlab-org/gitaly/v14/internal/git" - "gitlab.com/gitlab-org/labkit/correlation" ) // batch encapsulates a 'git cat-file --batch' process @@ -39,10 +38,7 @@ func (bc *BatchCache) newBatchProcess(ctx context.Context, repo git.RepositoryEx var stdinReader io.Reader stdinReader, b.w = io.Pipe() - // batch processes are long-lived and reused across RPCs, - // so we de-correlate the process from the RPC - ctx = correlation.ContextWithCorrelation(ctx, "") - ctx = opentracing.ContextWithSpan(ctx, nil) + span, ctx := opentracing.StartSpanFromContext(ctx, "catfile.BatchProcess") batchCmd, err := repo.Exec(ctx, git.SubCmd{ @@ -65,6 +61,7 @@ func (bc *BatchCache) newBatchProcess(ctx context.Context, repo git.RepositoryEx // This Close() is crucial to prevent leaking file descriptors. b.w.Close() bc.currentCatfileProcesses.Dec() + span.Finish() }() if bc.injectSpawnErrors { |