Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-02-27 05:42:53 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-03-07 06:38:53 +0300
commite8f47f99297377884e7dda554617a4fa7142c7e0 (patch)
treec8d9a169b9130b46ba7505af2b1ca3bd29830600 /internal
parent1739a8ca9a5786b4730620b742153f45e00cb094 (diff)
tracing: Fix disconnected spans after go through gitaly-hooks
gitaly-hooks is a sanitized environment. It does not initialize tracing nor export any tracing spans, even though its process environment variables set includes tracing info. The hook process then triggers RPC call back to Gitaly. Those RPCs are handled, but the corresponding spans are disconnected from the original source. This behavior makes a full flow involving gitaly-hooks broken and scattered all over the place. This commit fixes this situation by implementing two passthrough gRPC interceptors. They read the tracing info from env and then write into outgoing metadata without modification.
Diffstat (limited to 'internal')
-rw-r--r--internal/git/hooks_options.go6
-rw-r--r--internal/tracing/passthrough.go71
-rw-r--r--internal/tracing/passthrough_test.go300
3 files changed, 377 insertions, 0 deletions
diff --git a/internal/git/hooks_options.go b/internal/git/hooks_options.go
index c99f87f92..54c40a37b 100644
--- a/internal/git/hooks_options.go
+++ b/internal/git/hooks_options.go
@@ -12,8 +12,13 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/transaction/txinfo"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+ labkittracing "gitlab.com/gitlab-org/labkit/tracing"
)
+// envInjector is responsible for injecting environment variables required for tracing into
+// the child process.
+var envInjector = labkittracing.NewEnvInjector()
+
// WithDisabledHooks returns an option that satisfies the requirement to set up
// hooks, but won't in fact set up hook execution.
func WithDisabledHooks() CmdOpt {
@@ -124,6 +129,7 @@ func (cc *cmdCfg) configureHooks(
payload,
fmt.Sprintf("%s=%s", log.GitalyLogDirEnvKey, cfg.Logging.Dir),
)
+ cc.env = envInjector(ctx, cc.env)
cc.globals = append(cc.globals, ConfigPair{Key: "core.hooksPath", Value: gitCmdFactory.HooksPath(ctx)})
cc.hooksConfigured = true
diff --git a/internal/tracing/passthrough.go b/internal/tracing/passthrough.go
new file mode 100644
index 000000000..9d6548b07
--- /dev/null
+++ b/internal/tracing/passthrough.go
@@ -0,0 +1,71 @@
+package tracing
+
+import (
+ "context"
+ "strings"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
+ "github.com/opentracing/opentracing-go"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+// ExtractSpanContextFromEnv extracts a SpanContext from the environment variable list. The caller
+// usually passes the result of os.Environ() into this method.
+func ExtractSpanContextFromEnv(envs []string) (opentracing.SpanContext, error) {
+ envMap := environAsMap(envs)
+ return opentracing.GlobalTracer().Extract(
+ opentracing.TextMap,
+ opentracing.TextMapCarrier(envMap),
+ )
+}
+
+// UnaryPassthroughInterceptor is a client gRPC unary interceptor that rewrites a span context into
+// the outgoing metadata of the call. It is useful for intermediate systems who don't want to
+// start new spans.
+func UnaryPassthroughInterceptor(spanContext opentracing.SpanContext) grpc.UnaryClientInterceptor {
+ return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ ctxWithMetadata := injectSpanContext(parentCtx, spanContext)
+ return invoker(ctxWithMetadata, method, req, reply, cc, opts...)
+ }
+}
+
+// StreamPassthroughInterceptor is equivalent to UnaryPassthroughInterceptor, but for streaming
+// gRPC calls.
+func StreamPassthroughInterceptor(spanContext opentracing.SpanContext) grpc.StreamClientInterceptor {
+ return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ ctxWithMetadata := injectSpanContext(parentCtx, spanContext)
+ return streamer(ctxWithMetadata, desc, cc, method, opts...)
+ }
+}
+
+func injectSpanContext(parentCtx context.Context, spanContext opentracing.SpanContext) context.Context {
+ tracer := opentracing.GlobalTracer()
+ md := metautils.ExtractOutgoing(parentCtx).Clone()
+ if err := tracer.Inject(spanContext, opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
+ return parentCtx
+ }
+ ctxWithMetadata := md.ToOutgoing(parentCtx)
+ return ctxWithMetadata
+}
+
+func environAsMap(env []string) map[string]string {
+ envMap := make(map[string]string, len(env))
+ for _, v := range env {
+ s := strings.SplitN(v, "=", 2)
+ envMap[s[0]] = s[1]
+ }
+ return envMap
+}
+
+// metadataTextMap is a wrapper for gRPC's metadata.MD. It implements opentracing.TextMapWriter,
+// which is to set opentracing-related fields. In this use case, the passthrough interceptors touch
+// span identify and maybe some luggage or tag fields. This implementation is good-enough for such
+// fields. gRPC header name format is:
+// > Header-Name → 1*( %x30-39 / %x61-7A / "_" / "-" / ".") ; 0-9 a-z _ - .
+// > Source: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
+type metadataTextMap metadata.MD
+
+func (m metadataTextMap) Set(key, val string) {
+ m[strings.ToLower(key)] = []string{val}
+}
diff --git a/internal/tracing/passthrough_test.go b/internal/tracing/passthrough_test.go
new file mode 100644
index 000000000..01daad835
--- /dev/null
+++ b/internal/tracing/passthrough_test.go
@@ -0,0 +1,300 @@
+package tracing
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "testing"
+
+ "github.com/opentracing/opentracing-go"
+ "github.com/stretchr/testify/require"
+ "github.com/uber/jaeger-client-go"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/test/grpc_testing"
+)
+
+func TestExtractSpanContextFromEnv(t *testing.T) {
+ _ = stubTracingReporter(t)
+
+ injectedSpan := opentracing.StartSpan("test", opentracing.Tag{Key: "do-not-carry", Value: "value"})
+ injectedSpan.SetBaggageItem("hi", "hello")
+
+ jaegerInjectedSpan := injectedSpan.(*jaeger.Span)
+ jaegerInjectedSpanContext := jaegerInjectedSpan.SpanContext()
+
+ createSpanContext := func() []string {
+ env := envMap{}
+ err := opentracing.GlobalTracer().Inject(injectedSpan.Context(), opentracing.TextMap, env)
+ require.NoError(t, err)
+ return env.toSlice()
+ }
+
+ tests := []struct {
+ desc string
+ envs []string
+ expectedContext opentracing.SpanContext
+ expectedError string
+ }{
+ {
+ desc: "empty environment map",
+ envs: []string{},
+ expectedError: "opentracing: SpanContext not found in Extract carrier",
+ },
+ {
+ desc: "irrelevant environment map",
+ envs: []string{"SOME_THING=A", "SOMETHING_ELSE=B"},
+ expectedError: "opentracing: SpanContext not found in Extract carrier",
+ },
+ {
+ desc: "environment variable includes span context",
+ envs: createSpanContext(),
+ },
+ }
+ for _, tc := range tests {
+ t.Run(tc.desc, func(t *testing.T) {
+ spanContext, err := ExtractSpanContextFromEnv(tc.envs)
+ if tc.expectedError != "" {
+ require.Equal(t, tc.expectedError, err.Error())
+ } else {
+ require.NoError(t, err)
+ require.NotNil(t, spanContext)
+
+ span := opentracing.StartSpan("test", opentracing.ChildOf(spanContext))
+ jaegerSpan := span.(*jaeger.Span)
+ jaegerSpanContext := jaegerSpan.SpanContext()
+
+ require.Equal(t, jaegerInjectedSpanContext.TraceID(), jaegerSpanContext.TraceID())
+ require.Equal(t, jaegerInjectedSpanContext.SpanID(), jaegerSpanContext.ParentID())
+ require.Equal(t, opentracing.Tags{}, jaegerSpan.Tags())
+ require.Equal(t, "hello", jaegerSpan.BaggageItem("hi"))
+ }
+ })
+ }
+}
+
+func TestUnaryPassthroughInterceptor(t *testing.T) {
+ reporter := stubTracingReporter(t)
+ defer reporter.Reset()
+
+ tests := []struct {
+ desc string
+ setup func(*testing.T) (jaeger.SpanID, opentracing.SpanContext, func())
+ expectedSpans []string
+ }{
+ {
+ desc: "empty span context",
+ setup: func(t *testing.T) (jaeger.SpanID, opentracing.SpanContext, func()) {
+ return 0, nil, func() {}
+ },
+ expectedSpans: []string{
+ "/grpc.testing.TestService/UnaryCall",
+ },
+ },
+ {
+ desc: "span context with a simple span",
+ setup: func(t *testing.T) (jaeger.SpanID, opentracing.SpanContext, func()) {
+ span := opentracing.GlobalTracer().StartSpan("root")
+ return span.(*jaeger.Span).SpanContext().SpanID(), span.Context(), span.Finish
+ },
+ expectedSpans: []string{
+ "/grpc.testing.TestService/UnaryCall",
+ "root",
+ },
+ },
+ {
+ desc: "span context with a trace chain",
+ setup: func(t *testing.T) (jaeger.SpanID, opentracing.SpanContext, func()) {
+ root := opentracing.GlobalTracer().StartSpan("root")
+ child := opentracing.GlobalTracer().StartSpan("child", opentracing.ChildOf(root.Context()))
+ grandChild := opentracing.GlobalTracer().StartSpan("grandChild", opentracing.ChildOf(child.Context()))
+
+ return grandChild.(*jaeger.Span).SpanContext().SpanID(), grandChild.Context(), func() {
+ grandChild.Finish()
+ child.Finish()
+ root.Finish()
+ }
+ },
+ expectedSpans: []string{
+ "/grpc.testing.TestService/UnaryCall",
+ "grandChild",
+ "child",
+ "root",
+ },
+ },
+ }
+ for _, tc := range tests {
+ t.Run(tc.desc, func(t *testing.T) {
+ reporter.Reset()
+
+ var parentID jaeger.SpanID
+ service := &testSvc{
+ unaryCall: func(ctx context.Context, request *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ if span := opentracing.SpanFromContext(ctx); span != nil {
+ parentID = span.(*jaeger.Span).SpanContext().ParentID()
+ }
+ return &grpc_testing.SimpleResponse{}, nil
+ },
+ }
+ expectedParentID, spanContext, finishFunc := tc.setup(t)
+
+ client := startFakeGitalyServer(t, service, spanContext)
+ _, err := client.UnaryCall(testhelper.Context(t), &grpc_testing.SimpleRequest{})
+ require.NoError(t, err)
+
+ finishFunc()
+ require.Equal(t, expectedParentID, parentID)
+ require.Equal(t, tc.expectedSpans, reportedSpans(t, reporter))
+ })
+ }
+}
+
+func TestStreamPassthroughInterceptor(t *testing.T) {
+ reporter := stubTracingReporter(t)
+ defer reporter.Reset()
+
+ tests := []struct {
+ desc string
+ setup func(*testing.T) (jaeger.SpanID, opentracing.SpanContext, func())
+ expectedSpans []string
+ }{
+ {
+ desc: "empty span context",
+ setup: func(t *testing.T) (jaeger.SpanID, opentracing.SpanContext, func()) {
+ return 0, nil, func() {}
+ },
+ expectedSpans: []string{
+ "/grpc.testing.TestService/FullDuplexCall",
+ },
+ },
+ {
+ desc: "span context with a simple span",
+ setup: func(t *testing.T) (jaeger.SpanID, opentracing.SpanContext, func()) {
+ span := opentracing.GlobalTracer().StartSpan("root")
+ return span.(*jaeger.Span).SpanContext().SpanID(), span.Context(), span.Finish
+ },
+ expectedSpans: []string{
+ "/grpc.testing.TestService/FullDuplexCall",
+ "root",
+ },
+ },
+ {
+ desc: "span context with a trace chain",
+ setup: func(t *testing.T) (jaeger.SpanID, opentracing.SpanContext, func()) {
+ root := opentracing.GlobalTracer().StartSpan("root")
+ child := opentracing.GlobalTracer().StartSpan("child", opentracing.ChildOf(root.Context()))
+ grandChild := opentracing.GlobalTracer().StartSpan("grandChild", opentracing.ChildOf(child.Context()))
+
+ return grandChild.(*jaeger.Span).SpanContext().SpanID(), grandChild.Context(), func() {
+ grandChild.Finish()
+ child.Finish()
+ root.Finish()
+ }
+ },
+ expectedSpans: []string{
+ "/grpc.testing.TestService/FullDuplexCall",
+ "grandChild",
+ "child",
+ "root",
+ },
+ },
+ }
+ for _, tc := range tests {
+ t.Run(tc.desc, func(t *testing.T) {
+ reporter.Reset()
+
+ var parentID jaeger.SpanID
+ service := &testSvc{
+ fullDuplexCall: func(stream grpc_testing.TestService_FullDuplexCallServer) error {
+ _, err := stream.Recv()
+ require.NoError(t, err)
+ if span := opentracing.SpanFromContext(stream.Context()); span != nil {
+ parentID = span.(*jaeger.Span).SpanContext().ParentID()
+ }
+ require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallResponse{}))
+ return nil
+ },
+ }
+ expectedParentID, spanContext, finishFunc := tc.setup(t)
+
+ client := startFakeGitalyServer(t, service, spanContext)
+ stream, err := client.FullDuplexCall(testhelper.Context(t))
+ require.NoError(t, err)
+
+ require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallRequest{}))
+ _, err = stream.Recv()
+ require.NoError(t, err)
+ finishFunc()
+
+ require.Equal(t, expectedParentID, parentID)
+ require.Equal(t, tc.expectedSpans, reportedSpans(t, reporter))
+ })
+ }
+}
+
+type testSvc struct {
+ grpc_testing.UnimplementedTestServiceServer
+ unaryCall func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error)
+ fullDuplexCall func(stream grpc_testing.TestService_FullDuplexCallServer) error
+}
+
+func (ts *testSvc) UnaryCall(ctx context.Context, r *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ return ts.unaryCall(ctx, r)
+}
+
+func (ts *testSvc) FullDuplexCall(stream grpc_testing.TestService_FullDuplexCallServer) error {
+ return ts.fullDuplexCall(stream)
+}
+
+func startFakeGitalyServer(t *testing.T, svc *testSvc, spanContext opentracing.SpanContext) grpc_testing.TestServiceClient {
+ t.Helper()
+
+ listener, err := net.Listen("tcp", "localhost:0")
+ require.NoError(t, err)
+
+ srv := grpc.NewServer(
+ grpc.StreamInterceptor(grpctracing.StreamServerTracingInterceptor()),
+ grpc.UnaryInterceptor(grpctracing.UnaryServerTracingInterceptor()),
+ )
+ grpc_testing.RegisterTestServiceServer(srv, svc)
+
+ go testhelper.MustServe(t, srv, listener)
+ t.Cleanup(srv.Stop)
+
+ conn, err := grpc.Dial(
+ listener.Addr().String(),
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithUnaryInterceptor(UnaryPassthroughInterceptor(spanContext)),
+ grpc.WithStreamInterceptor(StreamPassthroughInterceptor(spanContext)),
+ )
+ require.NoError(t, err)
+
+ return grpc_testing.NewTestServiceClient(conn)
+}
+
+// envMap implements opentracing.TextMapReader and opentracing.TextMapWriter. It is used to create
+// testing environment maps used in below tests
+type envMap map[string]string
+
+func (e envMap) Set(key, val string) {
+ e[key] = val
+}
+
+func (e envMap) ForeachKey(handler func(key string, val string) error) error {
+ for key, val := range e {
+ if err := handler(key, val); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (e envMap) toSlice() []string {
+ var envSlice []string
+ for key, value := range e {
+ envSlice = append(envSlice, fmt.Sprintf("%s=%s", key, value))
+ }
+ return envSlice
+}