diff options
author | Mikhail Mazurskiy <mmazurskiy@gitlab.com> | 2022-08-26 15:56:48 +0300 |
---|---|---|
committer | Mikhail Mazurskiy <mmazurskiy@gitlab.com> | 2022-08-26 15:56:48 +0300 |
commit | dcf3a69a730c676ff486f81c535dba2e4e40d602 (patch) | |
tree | 67298ee28af07f52959982560827869aacc4784f | |
parent | b7b64d3ec5ff19f50d432a07cd6ac6c05ab141f5 (diff) |
Let client users specify interceptorsash2k/client-fixes
Do not assume client users want the same tracing interceptors with the same configuration.
-rw-r--r-- | client/dial_test.go | 23 | ||||
-rw-r--r-- | cmd/gitaly-backup/create.go | 3 | ||||
-rw-r--r-- | cmd/gitaly-backup/restore.go | 3 | ||||
-rw-r--r-- | cmd/gitaly-ssh/main.go | 3 | ||||
-rw-r--r-- | cmd/gitaly/main.go | 7 | ||||
-rw-r--r-- | cmd/praefect/subcmd.go | 3 | ||||
-rw-r--r-- | internal/git/gittest/repo.go | 3 | ||||
-rw-r--r-- | internal/gitaly/client/dial.go | 13 | ||||
-rw-r--r-- | internal/gitaly/service/repository/testhelper_test.go | 4 | ||||
-rw-r--r-- | internal/gitaly/transaction/manager.go | 7 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 2 | ||||
-rw-r--r-- | internal/praefect/nodes/ping.go | 3 | ||||
-rw-r--r-- | internal/praefect/service/checks.go | 7 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 5 |
14 files changed, 67 insertions, 19 deletions
diff --git a/client/dial_test.go b/client/dial_test.go index 6430be714..758ff1013 100644 --- a/client/dial_test.go +++ b/client/dial_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/jaeger-client-go" gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" gitalyx509 "gitlab.com/gitlab-org/gitaly/v15/internal/x509" "gitlab.com/gitlab-org/labkit/correlation" @@ -300,7 +301,9 @@ func TestDial_Correlation(t *testing.T) { defer grpcServer.Stop() ctx := testhelper.Context(t) - cc, err := DialContext(ctx, "unix://"+serverSocketPath, nil) + cc, err := DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{ + internalclient.UnaryInterceptor(), internalclient.StreamInterceptor(), + }) require.NoError(t, err) defer cc.Close() @@ -333,7 +336,9 @@ func TestDial_Correlation(t *testing.T) { defer grpcServer.Stop() ctx := testhelper.Context(t) - cc, err := DialContext(ctx, "unix://"+serverSocketPath, nil) + cc, err := DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{ + internalclient.UnaryInterceptor(), internalclient.StreamInterceptor(), + }) require.NoError(t, err) defer cc.Close() @@ -404,7 +409,9 @@ func TestDial_Tracing(t *testing.T) { // This needs to be run after setting up the global tracer as it will cause us to // create the span when executing the RPC call further down below. - cc, err := DialContext(ctx, "unix://"+serverSocketPath, nil) + cc, err := DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{ + internalclient.UnaryInterceptor(), internalclient.StreamInterceptor(), + }) require.NoError(t, err) defer cc.Close() @@ -461,7 +468,9 @@ func TestDial_Tracing(t *testing.T) { // This needs to be run after setting up the global tracer as it will cause us to // create the span when executing the RPC call further down below. - cc, err := DialContext(ctx, "unix://"+serverSocketPath, nil) + cc, err := DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{ + internalclient.UnaryInterceptor(), internalclient.StreamInterceptor(), + }) require.NoError(t, err) defer cc.Close() @@ -626,7 +635,11 @@ func TestHealthCheckDialer(t *testing.T) { _, err := HealthCheckDialer(DialContext)(ctx, addr, nil) testhelper.RequireGrpcError(t, status.Error(codes.Unauthenticated, "authentication required"), err) - cc, err := HealthCheckDialer(DialContext)(ctx, addr, []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("token"))}) + cc, err := HealthCheckDialer(DialContext)(ctx, addr, []grpc.DialOption{ + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("token")), + internalclient.UnaryInterceptor(), + internalclient.StreamInterceptor(), + }) require.NoError(t, err) require.NoError(t, cc.Close()) } diff --git a/cmd/gitaly-backup/create.go b/cmd/gitaly-backup/create.go index 1bf3b5e72..1faa69446 100644 --- a/cmd/gitaly-backup/create.go +++ b/cmd/gitaly-backup/create.go @@ -12,6 +12,7 @@ import ( log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v15/client" "gitlab.com/gitlab-org/gitaly/v15/internal/backup" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" ) @@ -52,7 +53,7 @@ func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io return fmt.Errorf("create: resolve locator: %w", err) } - pool := client.NewPool() + pool := client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()) defer pool.Close() manager := backup.NewManager(sink, locator, pool, cmd.backupID) diff --git a/cmd/gitaly-backup/restore.go b/cmd/gitaly-backup/restore.go index 4bdf16fc0..525269464 100644 --- a/cmd/gitaly-backup/restore.go +++ b/cmd/gitaly-backup/restore.go @@ -13,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v15/client" "gitlab.com/gitlab-org/gitaly/v15/internal/backup" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" ) @@ -50,7 +51,7 @@ func (cmd *restoreSubcommand) Run(ctx context.Context, stdin io.Reader, stdout i return fmt.Errorf("restore: resolve locator: %w", err) } - pool := client.NewPool() + pool := client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()) defer pool.Close() manager := backup.NewManager(sink, locator, pool, time.Now().UTC().Format("20060102150405")) diff --git a/cmd/gitaly-ssh/main.go b/cmd/gitaly-ssh/main.go index dc7989b50..7284ca6c0 100644 --- a/cmd/gitaly-ssh/main.go +++ b/cmd/gitaly-ssh/main.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" "gitlab.com/gitlab-org/gitaly/v15/client" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/labkit/tracing" "google.golang.org/grpc" @@ -143,7 +144,7 @@ func dialOpts() []grpc.DialOption { connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) } - return connOpts + return append(connOpts, internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()) } func useSidechannel() bool { return os.Getenv("GITALY_USE_SIDECHANNEL") == "1" } diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 7306b09e8..e1097b866 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v15/internal/git2go" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook" @@ -246,7 +247,11 @@ func run(cfg config.Cfg) error { conns := client.NewPoolWithOptions( client.WithDialer(client.HealthCheckDialer(client.DialContext)), - client.WithDialOptions(client.FailOnNonTempDialError()...), + client.WithDialOptions(append( + client.FailOnNonTempDialError(), + internalclient.UnaryInterceptor(), + internalclient.StreamInterceptor())..., + ), ) defer conns.Close() diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 523c41af5..78b3bcca4 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -12,6 +12,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" "gitlab.com/gitlab-org/gitaly/v15/client" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" @@ -121,6 +122,8 @@ func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration, opts = append(opts, grpc.WithBlock(), + internalclient.UnaryInterceptor(), + internalclient.StreamInterceptor(), ) if len(token) > 0 { diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go index 6438f0a6b..0086fcad0 100644 --- a/internal/git/gittest/repo.go +++ b/internal/git/gittest/repo.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/client" "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/repository" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" @@ -104,7 +105,7 @@ type CreateRepositoryConfig struct { } func dialService(ctx context.Context, tb testing.TB, cfg config.Cfg) *grpc.ClientConn { - dialOptions := []grpc.DialOption{} + dialOptions := []grpc.DialOption{internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()} if cfg.Auth.Token != "" { dialOptions = append(dialOptions, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token))) } diff --git a/internal/gitaly/client/dial.go b/internal/gitaly/client/dial.go index 84800995d..b0b558543 100644 --- a/internal/gitaly/client/dial.go +++ b/internal/gitaly/client/dial.go @@ -129,11 +129,6 @@ func Dial(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, ha Time: 20 * time.Second, PermitWithoutStream: true, }), - UnaryInterceptor(), - grpc.WithChainStreamInterceptor( - grpctracing.StreamClientTracingInterceptor(), - grpccorrelation.StreamClientCorrelationInterceptor(), - ), ) conn, err := grpc.DialContext(ctx, canonicalAddress, connOpts...) @@ -144,6 +139,14 @@ func Dial(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, ha return conn, nil } +// StreamInterceptor returns the stream interceptors that should be configured for a client. +func StreamInterceptor() grpc.DialOption { + return grpc.WithChainStreamInterceptor( + grpctracing.StreamClientTracingInterceptor(), + grpccorrelation.StreamClientCorrelationInterceptor(), + ) +} + // UnaryInterceptor returns the unary interceptors that should be configured for a client. func UnaryInterceptor() grpc.DialOption { return grpc.WithChainUnaryInterceptor( diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go index 24dc0d365..c7b1d4be7 100644 --- a/internal/gitaly/service/repository/testhelper_test.go +++ b/internal/gitaly/service/repository/testhelper_test.go @@ -67,7 +67,9 @@ func TestWithRubySidecar(t *testing.T) { } func newRepositoryClient(tb testing.TB, cfg config.Cfg, serverSocketPath string) gitalypb.RepositoryServiceClient { - var connOpts []grpc.DialOption + connOpts := []grpc.DialOption{ + internalclient.UnaryInterceptor(), internalclient.StreamInterceptor(), + } if cfg.Auth.Token != "" { connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token))) } diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go index f3a7a0dba..1a80bd434 100644 --- a/internal/gitaly/transaction/manager.go +++ b/internal/gitaly/transaction/manager.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v15/client" "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/v15/internal/transaction/voting" @@ -66,7 +67,11 @@ type PoolManager struct { func NewManager(cfg config.Cfg, backchannels *backchannel.Registry) *PoolManager { return &PoolManager{ backchannels: backchannels, - conns: client.NewPoolWithOptions(client.WithDialOptions(client.FailOnNonTempDialError()...)), + conns: client.NewPoolWithOptions(client.WithDialOptions(append( + client.FailOnNonTempDialError(), + internalclient.UnaryInterceptor(), + internalclient.StreamInterceptor())..., + )), votingDelayMetric: prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "gitaly_hook_transaction_voting_delay_seconds", diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index d91ef8d0a..422d98342 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -140,6 +140,8 @@ func Dial(ctx context.Context, node *config.Node, registry *protoregistry.Regist grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)), grpc.WithChainStreamInterceptor(streamInterceptors...), grpc.WithChainUnaryInterceptor(unaryInterceptors...), + client.UnaryInterceptor(), + client.StreamInterceptor(), } return client.Dial(ctx, node.Address, dialOpts, handshaker) diff --git a/internal/praefect/nodes/ping.go b/internal/praefect/nodes/ping.go index 91caef19e..e0bcd9b9b 100644 --- a/internal/praefect/nodes/ping.go +++ b/internal/praefect/nodes/ping.go @@ -10,6 +10,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" "gitlab.com/gitlab-org/gitaly/v15/client" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/grpc" @@ -72,6 +73,8 @@ func (p *Ping) Address() string { func (p *Ping) dial(ctx context.Context) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithBlock(), + internalclient.UnaryInterceptor(), + internalclient.StreamInterceptor(), } if len(p.token) > 0 { diff --git a/internal/praefect/service/checks.go b/internal/praefect/service/checks.go index 75ac9055d..41fd44de1 100644 --- a/internal/praefect/service/checks.go +++ b/internal/praefect/service/checks.go @@ -12,6 +12,7 @@ import ( migrate "github.com/rubenv/sql-migrate" gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" "gitlab.com/gitlab-org/gitaly/v15/client" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" @@ -245,7 +246,11 @@ func NewClockSyncCheck(clockDriftCheck func(ntpHost string, driftThreshold time. for j := range conf.VirtualStorages[i].Nodes { node := conf.VirtualStorages[i].Nodes[j] g.Go(func() error { - opts := []grpc.DialOption{grpc.WithBlock()} + opts := []grpc.DialOption{ + grpc.WithBlock(), + internalclient.UnaryInterceptor(), + internalclient.StreamInterceptor(), + } if len(node.Token) > 0 { opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token))) } diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 07749aa46..db46ca039 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v15/internal/git2go" + internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth" gitalylog "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/log" @@ -127,6 +128,8 @@ func (gs GitalyServer) Address() string { func waitHealthy(ctx context.Context, tb testing.TB, addr string, authToken string) { grpcOpts := []grpc.DialOption{ grpc.WithBlock(), + internalclient.UnaryInterceptor(), + internalclient.StreamInterceptor(), } if authToken != "" { grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(authToken))) @@ -255,7 +258,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg, r } if gsd.conns == nil { - gsd.conns = client.NewPool() + gsd.conns = client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()) } if gsd.locator == nil { |