diff options
author | Toon Claes <toon@gitlab.com> | 2023-09-08 16:36:29 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2023-09-08 16:36:29 +0300 |
commit | e7e38b42d14bd849c49e24034f750fc210c226ae (patch) | |
tree | 63aeaceefcb49c1269ed2c463ac0c84ad382dbc6 | |
parent | 5c6c63b811c45b73b6b18e74203493c5696ee549 (diff) | |
parent | 2b86a6e586f3ae1a223bf21193366272e72623c7 (diff) |
Merge branch 'pks-client-split-external-and-internal' into 'master'
client: Strictly distinguish internal and public code
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6336
Merged-by: Toon Claes <toon@gitlab.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: Toon Claes <toon@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
57 files changed, 529 insertions, 389 deletions
diff --git a/.golangci.yml b/.golangci.yml index c1346546c..825be0e44 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -50,6 +50,8 @@ linters-settings: desc: "ioutil is deprecated starting with Go 1.16" - pkg: "gitlab.com/gitlab-org/labkit/log" desc: "use internal/log instead" + - pkg: "gitlab.com/gitlab-org/gitaly/v16/client" + desc: "use internal/grpc/client instead" errcheck: # The following are functions for which we are currently not consistently # checking returned errors. This is not intended as a list of known-okay @@ -58,7 +60,6 @@ linters-settings: exclude-functions: - (*database/sql.DB).Close - (*database/sql.Rows).Close - - (*gitlab.com/gitlab-org/gitaly/v16/client.Pool).Close - (*gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel.ServerConn).Close - (*gitlab.com/gitlab-org/gitaly/v16/internal/streamcache.pipe).Close - (*gitlab.com/gitlab-org/gitaly/v16/internal/streamcache.pipeReader).Close diff --git a/client/dial.go b/client/dial.go index beaa7489a..3bc103b4c 100644 --- a/client/dial.go +++ b/client/dial.go @@ -12,7 +12,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/dnsresolver" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "google.golang.org/grpc" - healthpb "google.golang.org/grpc/health/grpc_health_v1" ) // DefaultDialOpts hold the default DialOptions for connection to Gitaly over UNIX-socket @@ -42,34 +41,18 @@ func Dial(rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, erro // injects sr as a sidechannel registry, so that Gitaly can establish // sidechannels back to the client. func DialSidechannel(ctx context.Context, rawAddress string, sr *SidechannelRegistry, connOpts []grpc.DialOption) (*grpc.ClientConn, error) { - clientHandshaker := sidechannel.NewClientHandshaker(sr.logger, sr.registry) - return client.Dial(ctx, rawAddress, client.WithGrpcOptions(connOpts), client.WithHandshaker(clientHandshaker)) + return sidechannel.Dial(ctx, sr.registry, sr.logger, rawAddress, connOpts) } // FailOnNonTempDialError helps to identify if remote listener is ready to accept new connections. func FailOnNonTempDialError() []grpc.DialOption { - return []grpc.DialOption{ - grpc.WithBlock(), - grpc.FailOnNonTempDialError(true), - } + return client.FailOnNonTempDialError() } // HealthCheckDialer uses provided dialer as an actual dialer, but issues a health check request to the remote // to verify the connection was set properly and could be used with no issues. func HealthCheckDialer(base Dialer) Dialer { - return func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) { - cc, err := base(ctx, address, dialOptions) - if err != nil { - return nil, err - } - - if _, err := healthpb.NewHealthClient(cc).Check(ctx, &healthpb.HealthCheckRequest{}); err != nil { - _ = cc.Close() - return nil, err - } - - return cc, nil - } + return Dialer(client.HealthCheckDialer(client.Dialer(base))) } // DNSResolverBuilderConfig exposes the DNS resolver builder option. It is used to build Gitaly diff --git a/client/pool.go b/client/pool.go index ec430f73e..1354f726c 100644 --- a/client/pool.go +++ b/client/pool.go @@ -2,26 +2,30 @@ package client import ( "context" - "errors" - "fmt" - "sync" - gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "google.golang.org/grpc" ) +// PoolOption is an option that can be passed to NewPoolWithOptions. +type PoolOption = client.PoolOption + // Dialer is used by the Pool to create a *grpc.ClientConn. type Dialer func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) -type poolKey struct{ address, token string } +// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes. +func WithDialer(dialer Dialer) PoolOption { + return client.WithDialer(client.Dialer(dialer)) +} + +// WithDialOptions sets gRPC options to use for the gRPC Dial call. +func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption { + return client.WithDialOptions(dialOptions...) +} -// Pool is a pool of GRPC connections. Connections created by it are safe for -// concurrent use. +// Pool is a pool of GRPC connections. Connections created by it are safe for concurrent use. type Pool struct { - lock sync.RWMutex - conns map[poolKey]*grpc.ClientConn - dialer Dialer - dialOptions []grpc.DialOption + pool *client.Pool } // NewPool creates a new connection pool that's ready for use. @@ -31,73 +35,19 @@ func NewPool(dialOptions ...grpc.DialOption) *Pool { // NewPoolWithOptions creates a new connection pool that's ready for use. func NewPoolWithOptions(poolOptions ...PoolOption) *Pool { - opts := applyPoolOptions(poolOptions) return &Pool{ - conns: make(map[poolKey]*grpc.ClientConn), - dialer: opts.dialer, - dialOptions: opts.dialOptions, - } -} - -// Close closes all connections tracked by the connection pool. -func (p *Pool) Close() error { - p.lock.Lock() - defer p.lock.Unlock() - - var firstError error - for addr, conn := range p.conns { - if err := conn.Close(); err != nil && firstError == nil { - firstError = err - } - - delete(p.conns, addr) + pool: client.NewPool(poolOptions...), } - - return firstError } // Dial creates a new client connection in case no connection to the given // address exists already or returns an already established connection. The // returned address must not be `Close()`d. func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) { - return p.getOrCreateConnection(ctx, address, token) + return p.pool.Dial(ctx, address, token) } -func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) { - if address == "" { - return nil, errors.New("address is empty") - } - - key := poolKey{address: address, token: token} - - p.lock.RLock() - cc, ok := p.conns[key] - p.lock.RUnlock() - - if ok { - return cc, nil - } - - p.lock.Lock() - defer p.lock.Unlock() - - cc, ok = p.conns[key] - if ok { - return cc, nil - } - - opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1) - opts = append(opts, p.dialOptions...) - if token != "" { - opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) - } - - cc, err := p.dialer(ctx, address, opts) - if err != nil { - return nil, fmt.Errorf("could not dial source: %w", err) - } - - p.conns[key] = cc - - return cc, nil +// Close closes all connections tracked by the connection pool. +func (p *Pool) Close() error { + return p.pool.Close() } diff --git a/client/pool_options.go b/client/pool_options.go deleted file mode 100644 index a8139c1f2..000000000 --- a/client/pool_options.go +++ /dev/null @@ -1,39 +0,0 @@ -package client - -import "google.golang.org/grpc" - -type poolOptions struct { - dialer Dialer - dialOptions []grpc.DialOption -} - -//nolint:revive // This is unintentionally missing documentation. -type PoolOption func(*poolOptions) - -func applyPoolOptions(options []PoolOption) *poolOptions { - opts := defaultPoolOptions() - for _, opt := range options { - opt(opts) - } - return opts -} - -func defaultPoolOptions() *poolOptions { - return &poolOptions{ - dialer: DialContext, - } -} - -// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes. -func WithDialer(dialer Dialer) PoolOption { - return func(options *poolOptions) { - options.dialer = dialer - } -} - -// WithDialOptions sets gRPC options to use for the gRPC Dial call. -func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption { - return func(options *poolOptions) { - options.dialOptions = dialOptions - } -} diff --git a/client/pool_test.go b/client/pool_test.go index af29f4bb7..262fa85a5 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -230,7 +230,7 @@ func TestPool_Dial_same_addr_another_token(t *testing.T) { defer func() { stop1() }() pool := NewPool() - defer pool.Close() + defer testhelper.MustClose(t, pool) // all good - server is running and serving requests conn, err := pool.Dial(ctx, addr, "") diff --git a/client/receive_pack.go b/client/receive_pack.go index c59f87d06..2d8d68e44 100644 --- a/client/receive_pack.go +++ b/client/receive_pack.go @@ -4,39 +4,12 @@ import ( "context" "io" - "gitlab.com/gitlab-org/gitaly/v16/internal/stream" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "gitlab.com/gitlab-org/gitaly/v16/streamio" "google.golang.org/grpc" ) // ReceivePack proxies an SSH git-receive-pack (git push) session to Gitaly func ReceivePack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHReceivePackRequest) (int32, error) { - ctx2, cancel := context.WithCancel(ctx) - defer cancel() - - ssh := gitalypb.NewSSHServiceClient(conn) - receivePackStream, err := ssh.SSHReceivePack(ctx2) - if err != nil { - return 0, err - } - - if err = receivePackStream.Send(req); err != nil { - return 0, err - } - - inWriter := streamio.NewWriter(func(p []byte) error { - return receivePackStream.Send(&gitalypb.SSHReceivePackRequest{Stdin: p}) - }) - - return stream.Handler(func() (stream.StdoutStderrResponse, error) { - return receivePackStream.Recv() - }, func(errC chan error) { - _, errRecv := io.Copy(inWriter, stdin) - if err := receivePackStream.CloseSend(); err != nil && errRecv == nil { - errC <- err - } else { - errC <- errRecv - } - }, stdout, stderr) + return gitalyclient.ReceivePack(ctx, conn, stdin, stdout, stderr, req) } diff --git a/client/upload_archive.go b/client/upload_archive.go index 654e7c583..572e6bc9b 100644 --- a/client/upload_archive.go +++ b/client/upload_archive.go @@ -4,39 +4,12 @@ import ( "context" "io" - "gitlab.com/gitlab-org/gitaly/v16/internal/stream" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "gitlab.com/gitlab-org/gitaly/v16/streamio" "google.golang.org/grpc" ) // UploadArchive proxies an SSH git-upload-archive (git archive --remote) session to Gitaly func UploadArchive(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadArchiveRequest) (int32, error) { - ctx2, cancel := context.WithCancel(ctx) - defer cancel() - - ssh := gitalypb.NewSSHServiceClient(conn) - uploadPackStream, err := ssh.SSHUploadArchive(ctx2) - if err != nil { - return 0, err - } - - if err = uploadPackStream.Send(req); err != nil { - return 0, err - } - - inWriter := streamio.NewWriter(func(p []byte) error { - return uploadPackStream.Send(&gitalypb.SSHUploadArchiveRequest{Stdin: p}) - }) - - return stream.Handler(func() (stream.StdoutStderrResponse, error) { - return uploadPackStream.Recv() - }, func(errC chan error) { - _, errRecv := io.Copy(inWriter, stdin) - if err := uploadPackStream.CloseSend(); err != nil && errRecv == nil { - errC <- err - } else { - errC <- errRecv - } - }, stdout, stderr) + return gitalyclient.UploadArchive(ctx, conn, stdin, stdout, stderr, req) } diff --git a/client/upload_pack.go b/client/upload_pack.go index 65d81746e..4ad17dcb4 100644 --- a/client/upload_pack.go +++ b/client/upload_pack.go @@ -4,48 +4,18 @@ import ( "context" "io" - "gitlab.com/gitlab-org/gitaly/v16/internal/stream" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "gitlab.com/gitlab-org/gitaly/v16/streamio" "google.golang.org/grpc" ) // UploadPack proxies an SSH git-upload-pack (git fetch) session to Gitaly func UploadPack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadPackRequest) (int32, error) { - ctx2, cancel := context.WithCancel(ctx) - defer cancel() - - ssh := gitalypb.NewSSHServiceClient(conn) - uploadPackStream, err := ssh.SSHUploadPack(ctx2) - if err != nil { - return 0, err - } - - if err = uploadPackStream.Send(req); err != nil { - return 0, err - } - - inWriter := streamio.NewWriter(func(p []byte) error { - return uploadPackStream.Send(&gitalypb.SSHUploadPackRequest{Stdin: p}) - }) - - return stream.Handler(func() (stream.StdoutStderrResponse, error) { - return uploadPackStream.Recv() - }, func(errC chan error) { - _, errRecv := io.Copy(inWriter, stdin) - if err := uploadPackStream.CloseSend(); err != nil && errRecv == nil { - errC <- err - } else { - errC <- errRecv - } - }, stdout, stderr) + return gitalyclient.UploadPack(ctx, conn, stdin, stdout, stderr, req) } // UploadPackResult wraps ExitCode and PackfileNegotiationStatistics. -type UploadPackResult struct { - ExitCode int32 - PackfileNegotiationStatistics *gitalypb.PackfileNegotiationStatistics -} +type UploadPackResult = gitalyclient.UploadPackResult // UploadPackWithSidechannelWithResult proxies an SSH git-upload-pack (git fetch) // session to Gitaly using a sidechannel for the raw data transfer. @@ -57,31 +27,7 @@ func UploadPackWithSidechannelWithResult( stdout, stderr io.Writer, req *gitalypb.SSHUploadPackWithSidechannelRequest, ) (UploadPackResult, error) { - result := UploadPackResult{} - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - ctx, wt := reg.Register(ctx, func(c SidechannelConn) error { - return stream.ProxyPktLine(c, stdin, stdout, stderr) - }) - defer func() { - // We aleady check the error further down. - _ = wt.Close() - }() - - sshClient := gitalypb.NewSSHServiceClient(conn) - resp, err := sshClient.SSHUploadPackWithSidechannel(ctx, req) - if err != nil { - return result, err - } - result.ExitCode = 0 - result.PackfileNegotiationStatistics = resp.PackfileNegotiationStatistics - - if err := wt.Close(); err != nil { - return result, err - } - - return result, nil + return gitalyclient.UploadPackWithSidechannel(ctx, conn, reg.registry, stdin, stdout, stderr, req) } // UploadPackWithSidechannel proxies an SSH git-upload-pack (git fetch) diff --git a/cmd/gitaly-backup/create.go b/cmd/gitaly-backup/create.go index 010f1055a..f2fb543af 100644 --- a/cmd/gitaly-backup/create.go +++ b/cmd/gitaly-backup/create.go @@ -10,10 +10,9 @@ import ( "time" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -45,8 +44,10 @@ func (cmd *createSubcommand) Flags(fs *flag.FlagSet) { } func (cmd *createSubcommand) Run(ctx context.Context, logger logrus.FieldLogger, stdin io.Reader, stdout io.Writer) error { - pool := client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()) - defer pool.Close() + pool := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor())) + defer func() { + _ = pool.Close() + }() var manager backup.Strategy if cmd.serverSide { diff --git a/cmd/gitaly-backup/restore.go b/cmd/gitaly-backup/restore.go index c5a5c8fc5..d2d0670fc 100644 --- a/cmd/gitaly-backup/restore.go +++ b/cmd/gitaly-backup/restore.go @@ -11,10 +11,9 @@ import ( "strings" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -50,8 +49,10 @@ func (cmd *restoreSubcommand) Flags(fs *flag.FlagSet) { } func (cmd *restoreSubcommand) Run(ctx context.Context, logger logrus.FieldLogger, stdin io.Reader, stdout io.Writer) error { - pool := client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()) - defer pool.Close() + pool := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor())) + defer func() { + _ = pool.Close() + }() var manager backup.Strategy if cmd.serverSide { diff --git a/cmd/gitaly-hooks/hooks.go b/cmd/gitaly-hooks/hooks.go index a19873cf0..d8bd931fb 100644 --- a/cmd/gitaly-hooks/hooks.go +++ b/cmd/gitaly-hooks/hooks.go @@ -12,10 +12,10 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/sirupsen/logrus" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -175,7 +175,7 @@ func executeHook(ctx context.Context, cmd hookCommand, args []string) error { ctx = injectMetadataIntoOutgoingCtx(ctx, payload) - conn, err := dialGitaly(payload) + conn, err := dialGitaly(ctx, payload) if err != nil { return fmt.Errorf("error when connecting to gitaly: %w", err) } @@ -207,8 +207,8 @@ func injectMetadataIntoOutgoingCtx(ctx context.Context, payload git.HooksPayload func noopSender(c chan error) {} -func dialGitaly(payload git.HooksPayload) (*grpc.ClientConn, error) { - dialOpts := client.DefaultDialOpts +func dialGitaly(ctx context.Context, payload git.HooksPayload) (*grpc.ClientConn, error) { + var dialOpts []grpc.DialOption if payload.InternalSocketToken != "" { dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(payload.InternalSocketToken))) } @@ -234,7 +234,7 @@ func dialGitaly(payload git.HooksPayload) (*grpc.ClientConn, error) { dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unaryInterceptors...))) dialOpts = append(dialOpts, grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(streamInterceptors...))) - conn, err := client.Dial("unix://"+payload.InternalSocket, dialOpts) + conn, err := client.Dial(ctx, "unix://"+payload.InternalSocket, client.WithGrpcOptions(dialOpts)) if err != nil { return nil, fmt.Errorf("error when dialing: %w", err) } diff --git a/cmd/gitaly-ssh/main.go b/cmd/gitaly-ssh/main.go index 121186378..5b1bf262f 100644 --- a/cmd/gitaly-ssh/main.go +++ b/cmd/gitaly-ssh/main.go @@ -9,15 +9,15 @@ import ( "github.com/sirupsen/logrus" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/labkit/tracing" "google.golang.org/grpc" ) -type packFn func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) +type packFn func(context.Context, *grpc.ClientConn, *sidechannel.Registry, string) (int32, error) type gitalySSHCommand struct { // The git packer that shall be executed. One of receivePack, @@ -117,8 +117,8 @@ func (cmd gitalySSHCommand) run(logger logrus.FieldLogger) (int, error) { } } - registry := client.NewSidechannelRegistry(logger) - conn, err := getConnection(ctx, cmd.address, registry) + registry := sidechannel.NewRegistry() + conn, err := getConnection(ctx, cmd.address, registry, logger) if err != nil { return 1, err } @@ -132,25 +132,25 @@ func (cmd gitalySSHCommand) run(logger logrus.FieldLogger) (int, error) { return int(code), nil } -func getConnection(ctx context.Context, url string, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) { +func getConnection(ctx context.Context, url string, registry *sidechannel.Registry, logger logrus.FieldLogger) (*grpc.ClientConn, error) { if url == "" { return nil, fmt.Errorf("gitaly address can not be empty") } if useSidechannel() { - return client.DialSidechannel(ctx, url, registry, dialOpts()) + return sidechannel.Dial(ctx, registry, logger, url, dialOpts()) } - return client.DialContext(ctx, url, dialOpts()) + return client.Dial(ctx, url, client.WithGrpcOptions(dialOpts())) } func dialOpts() []grpc.DialOption { - connOpts := client.DefaultDialOpts + var connOpts []grpc.DialOption if token := os.Getenv("GITALY_TOKEN"); token != "" { connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) } - return append(connOpts, internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()) + return append(connOpts, client.UnaryInterceptor(), client.StreamInterceptor()) } func useSidechannel() bool { return os.Getenv("GITALY_USE_SIDECHANNEL") == "1" } diff --git a/cmd/gitaly-ssh/main_test.go b/cmd/gitaly-ssh/main_test.go index 258509e9c..640b7e5e6 100644 --- a/cmd/gitaly-ssh/main_test.go +++ b/cmd/gitaly-ssh/main_test.go @@ -7,19 +7,19 @@ import ( "testing" "github.com/stretchr/testify/assert" - "gitlab.com/gitlab-org/gitaly/v16/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "google.golang.org/grpc" ) func TestRun(t *testing.T) { - var successPacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) { + var successPacker packFn = func(context.Context, *grpc.ClientConn, *sidechannel.Registry, string) (int32, error) { return 0, nil } - var exitCodePacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) { + var exitCodePacker packFn = func(context.Context, *grpc.ClientConn, *sidechannel.Registry, string) (int32, error) { return 123, nil } - var errorPacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) { + var errorPacker packFn = func(context.Context, *grpc.ClientConn, *sidechannel.Registry, string) (int32, error) { return 1, fmt.Errorf("fail") } diff --git a/cmd/gitaly-ssh/receive_pack.go b/cmd/gitaly-ssh/receive_pack.go index f5cc8a51f..268e3d69f 100644 --- a/cmd/gitaly-ssh/receive_pack.go +++ b/cmd/gitaly-ssh/receive_pack.go @@ -5,13 +5,14 @@ import ( "fmt" "os" - "gitlab.com/gitlab-org/gitaly/v16/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" ) -func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) { +func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.Registry, req string) (int32, error) { var request gitalypb.SSHReceivePackRequest if err := protojson.Unmarshal([]byte(req), &request); err != nil { @@ -21,5 +22,5 @@ func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *client.Si ctx, cancel := context.WithCancel(ctx) defer cancel() - return client.ReceivePack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) + return gitalyclient.ReceivePack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) } diff --git a/cmd/gitaly-ssh/upload_archive.go b/cmd/gitaly-ssh/upload_archive.go index de1d434f7..f92ffa965 100644 --- a/cmd/gitaly-ssh/upload_archive.go +++ b/cmd/gitaly-ssh/upload_archive.go @@ -5,13 +5,14 @@ import ( "fmt" "os" - "gitlab.com/gitlab-org/gitaly/v16/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" ) -func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) { +func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.Registry, req string) (int32, error) { var request gitalypb.SSHUploadArchiveRequest if err := protojson.Unmarshal([]byte(req), &request); err != nil { return 0, fmt.Errorf("json unmarshal: %w", err) @@ -20,5 +21,5 @@ func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *client. ctx, cancel := context.WithCancel(ctx) defer cancel() - return client.UploadArchive(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) + return gitalyclient.UploadArchive(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) } diff --git a/cmd/gitaly-ssh/upload_pack.go b/cmd/gitaly-ssh/upload_pack.go index b4a9e0755..d300a3fc0 100644 --- a/cmd/gitaly-ssh/upload_pack.go +++ b/cmd/gitaly-ssh/upload_pack.go @@ -5,7 +5,8 @@ import ( "fmt" "os" - "gitlab.com/gitlab-org/gitaly/v16/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" @@ -22,7 +23,7 @@ func uploadPackConfig(config []string) []string { return append([]string{GitConfigShowAllRefs}, config...) } -func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) { +func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.Registry, req string) (int32, error) { var request gitalypb.SSHUploadPackRequest if err := protojson.Unmarshal([]byte(req), &request); err != nil { return 0, fmt.Errorf("json unmarshal: %w", err) @@ -33,10 +34,10 @@ func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *client.Sid ctx, cancel := context.WithCancel(ctx) defer cancel() - return client.UploadPack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) + return gitalyclient.UploadPack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) } -func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) { +func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.Registry, req string) (int32, error) { var request gitalypb.SSHUploadPackWithSidechannelRequest if err := protojson.Unmarshal([]byte(req), &request); err != nil { return 0, fmt.Errorf("json unmarshal: %w", err) @@ -47,5 +48,10 @@ func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, regis ctx, cancel := context.WithCancel(ctx) defer cancel() - return client.UploadPackWithSidechannel(ctx, conn, registry, os.Stdin, os.Stdout, os.Stderr, &request) + result, err := gitalyclient.UploadPackWithSidechannel(ctx, conn, registry, os.Stdin, os.Stdout, os.Stderr, &request) + if err != nil { + return 0, err + } + + return result.ExitCode, nil } diff --git a/internal/backup/backup.go b/internal/backup/backup.go index fd308abd2..267ec5f99 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -7,13 +7,13 @@ import ( "fmt" "io" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index eb4fa2011..ac7b155c0 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/archive" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -20,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -430,7 +430,7 @@ func TestManager_Restore_latest(t *testing.T) { ctx := testhelper.Context(t) - cc, err := client.Dial(cfg.SocketPath, nil) + cc, err := client.Dial(ctx, cfg.SocketPath) require.NoError(t, err) defer testhelper.MustClose(t, cc) @@ -773,7 +773,7 @@ func TestManager_Restore_specific(t *testing.T) { ctx := testhelper.Context(t) - cc, err := client.Dial(cfg.SocketPath, nil) + cc, err := client.Dial(ctx, cfg.SocketPath) require.NoError(t, err) defer testhelper.MustClose(t, cc) diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go index 1a2cf6b41..ecfb08d5d 100644 --- a/internal/backup/server_side.go +++ b/internal/backup/server_side.go @@ -4,8 +4,8 @@ import ( "context" "fmt" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc/codes" diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index 5d69b70b2..13d375d69 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -6,13 +6,13 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 989c175c3..9822b2ab4 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -13,7 +13,6 @@ import ( "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" "gitlab.com/gitlab-org/gitaly/v16" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap" "gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap/starter" @@ -36,7 +35,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" @@ -257,15 +256,21 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error { hookManager = hm } - conns := client.NewPoolWithOptions( - client.WithDialer(client.HealthCheckDialer(client.DialContext)), + conns := client.NewPool( + client.WithDialer(client.HealthCheckDialer( + func(ctx context.Context, address string, opts []grpc.DialOption) (*grpc.ClientConn, error) { + return client.Dial(ctx, address, client.WithGrpcOptions(opts)) + }, + )), client.WithDialOptions(append( client.FailOnNonTempDialError(), - internalclient.UnaryInterceptor(), - internalclient.StreamInterceptor())..., + client.UnaryInterceptor(), + client.StreamInterceptor())..., ), ) - defer conns.Close() + defer func() { + _ = conns.Close() + }() catfileCache := catfile.NewCache(cfg) defer catfileCache.Stop() diff --git a/internal/cli/gitaly/subcmd_hooks.go b/internal/cli/gitaly/subcmd_hooks.go index f0d0946d6..16b2e08c9 100644 --- a/internal/cli/gitaly/subcmd_hooks.go +++ b/internal/cli/gitaly/subcmd_hooks.go @@ -9,9 +9,8 @@ import ( "github.com/urfave/cli/v2" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v16/streamio" @@ -151,8 +150,8 @@ func dial(ctx context.Context, addr, token string, timeout time.Duration, opts . opts = append(opts, grpc.WithBlock(), - internalclient.UnaryInterceptor(), - internalclient.StreamInterceptor(), + client.UnaryInterceptor(), + client.StreamInterceptor(), ) if len(token) > 0 { @@ -163,7 +162,7 @@ func dial(ctx context.Context, addr, token string, timeout time.Duration, opts . ) } - return client.DialContext(ctx, addr, opts) + return client.Dial(ctx, addr, client.WithGrpcOptions(opts)) } func getAddressWithScheme(cfg config.Cfg) (string, error) { diff --git a/internal/cli/gitaly/subcmd_hooks_test.go b/internal/cli/gitaly/subcmd_hooks_test.go index a3287c830..338a72c0e 100644 --- a/internal/cli/gitaly/subcmd_hooks_test.go +++ b/internal/cli/gitaly/subcmd_hooks_test.go @@ -2,6 +2,7 @@ package gitaly import ( "bytes" + "context" "io" "io/fs" "os/exec" @@ -10,12 +11,11 @@ import ( "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - gclient "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" @@ -41,7 +41,7 @@ func TestSetHooksSubcommand(t *testing.T) { // The generated socket path already has the unix prefix. This needs to be // removed because the Gitaly config does not expect a scheme to be present. cfg.SocketPath = strings.TrimPrefix(serverSocketPath, "unix://") - client := newRepositoryClient(t, cfg, serverSocketPath) + client := newRepositoryClient(t, ctx, cfg, serverSocketPath) configPath := testcfg.WriteTemporaryGitalyConfigFile(t, cfg) @@ -216,16 +216,16 @@ func TestSetHooksSubcommand(t *testing.T) { } } -func newRepositoryClient(tb testing.TB, cfg config.Cfg, serverSocketPath string) gitalypb.RepositoryServiceClient { +func newRepositoryClient(tb testing.TB, ctx context.Context, cfg config.Cfg, serverSocketPath string) gitalypb.RepositoryServiceClient { tb.Helper() connOpts := []grpc.DialOption{ - internalclient.UnaryInterceptor(), internalclient.StreamInterceptor(), + client.UnaryInterceptor(), client.StreamInterceptor(), } if cfg.Auth.Token != "" { connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token))) } - conn, err := gclient.Dial(serverSocketPath, connOpts) + conn, err := client.Dial(ctx, serverSocketPath, client.WithGrpcOptions(connOpts)) require.NoError(tb, err) tb.Cleanup(func() { require.NoError(tb, conn.Close()) }) diff --git a/internal/cli/praefect/subcmd.go b/internal/cli/praefect/subcmd.go index aa7a2d623..bb171b2a3 100644 --- a/internal/cli/praefect/subcmd.go +++ b/internal/cli/praefect/subcmd.go @@ -9,8 +9,7 @@ import ( "time" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql" "google.golang.org/grpc" @@ -62,8 +61,8 @@ func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration, opts = append(opts, grpc.WithBlock(), - internalclient.UnaryInterceptor(), - internalclient.StreamInterceptor(), + client.UnaryInterceptor(), + client.StreamInterceptor(), ) if len(token) > 0 { @@ -74,7 +73,7 @@ func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration, ) } - return client.DialContext(ctx, addr, opts) + return client.Dial(ctx, addr, client.WithGrpcOptions(opts)) } type requiredParameterError string diff --git a/internal/cli/praefect/subcmd_list_untracked_repositories_test.go b/internal/cli/praefect/subcmd_list_untracked_repositories_test.go index 694f99040..560f22217 100644 --- a/internal/cli/praefect/subcmd_list_untracked_repositories_test.go +++ b/internal/cli/praefect/subcmd_list_untracked_repositories_test.go @@ -11,9 +11,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -24,6 +24,9 @@ import ( func TestListUntrackedRepositoriesCommand(t *testing.T) { t.Parallel() + + ctx := testhelper.Context(t) + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) @@ -53,11 +56,10 @@ func TestListUntrackedRepositoriesCommand(t *testing.T) { praefectServer := testserver.StartPraefect(t, conf) - cc, err := client.Dial(praefectServer.Address(), nil) + cc, err := client.Dial(ctx, praefectServer.Address()) require.NoError(t, err) defer func() { require.NoError(t, cc.Close()) }() repoClient := gitalypb.NewRepositoryServiceClient(cc) - ctx := testhelper.Context(t) praefectStorage := conf.VirtualStorages[0].Name diff --git a/internal/cli/praefect/subcmd_remove_repository_test.go b/internal/cli/praefect/subcmd_remove_repository_test.go index 489524e36..faf277824 100644 --- a/internal/cli/praefect/subcmd_remove_repository_test.go +++ b/internal/cli/praefect/subcmd_remove_repository_test.go @@ -12,10 +12,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" @@ -30,6 +30,8 @@ import ( func TestRemoveRepositorySubcommand(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) @@ -61,11 +63,10 @@ func TestRemoveRepositorySubcommand(t *testing.T) { praefectServer := testserver.StartPraefect(t, conf) - cc, err := client.Dial(praefectServer.Address(), nil) + cc, err := client.Dial(ctx, praefectServer.Address()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, cc.Close()) }) repoClient := gitalypb.NewRepositoryServiceClient(cc) - ctx := testhelper.Context(t) praefectStorage := conf.VirtualStorages[0].Name diff --git a/internal/cli/praefect/subcmd_track_repositories_test.go b/internal/cli/praefect/subcmd_track_repositories_test.go index d5371a32c..85af24ef2 100644 --- a/internal/cli/praefect/subcmd_track_repositories_test.go +++ b/internal/cli/praefect/subcmd_track_repositories_test.go @@ -11,8 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" @@ -28,6 +28,9 @@ import ( func TestTrackRepositoriesSubcommand(t *testing.T) { t.Parallel() + + ctx := testhelper.Context(t) + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) testcfg.BuildGitalyHooks(t, g2Cfg) @@ -65,10 +68,9 @@ func TestTrackRepositoriesSubcommand(t *testing.T) { } confPath := writeConfigToFile(t, conf) - gitalyCC, err := client.Dial(g1Addr, nil) + gitalyCC, err := client.Dial(ctx, g1Addr) require.NoError(t, err) defer testhelper.MustClose(t, gitalyCC) - ctx := testhelper.Context(t) gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC) diff --git a/internal/cli/praefect/subcmd_track_repository_test.go b/internal/cli/praefect/subcmd_track_repository_test.go index bd10b3411..f1b2e5308 100644 --- a/internal/cli/praefect/subcmd_track_repository_test.go +++ b/internal/cli/praefect/subcmd_track_repository_test.go @@ -7,8 +7,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" @@ -23,6 +23,9 @@ import ( func TestTrackRepositorySubcommand(t *testing.T) { t.Parallel() + + ctx := testhelper.Context(t) + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) testcfg.BuildGitalyHooks(t, g2Cfg) @@ -60,10 +63,9 @@ func TestTrackRepositorySubcommand(t *testing.T) { } confPath := writeConfigToFile(t, conf) - gitalyCC, err := client.Dial(g1Addr, nil) + gitalyCC, err := client.Dial(ctx, g1Addr) require.NoError(t, err) defer func() { require.NoError(t, gitalyCC.Close()) }() - ctx := testhelper.Context(t) gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC) diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go index 68edeeecb..0e1878dbf 100644 --- a/internal/git/gittest/repo.go +++ b/internal/git/gittest/repo.go @@ -11,11 +11,10 @@ import ( "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -97,7 +96,7 @@ type CreateRepositoryConfig struct { func dialService(tb testing.TB, ctx context.Context, cfg config.Cfg) *grpc.ClientConn { tb.Helper() - dialOptions := []grpc.DialOption{internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()} + dialOptions := []grpc.DialOption{client.UnaryInterceptor(), client.StreamInterceptor()} if cfg.Auth.Token != "" { dialOptions = append(dialOptions, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token))) } @@ -114,7 +113,7 @@ func dialService(tb testing.TB, ctx context.Context, cfg config.Cfg) *grpc.Clien require.FailNow(tb, "cannot dial service without configured address") } - conn, err := client.DialContext(ctx, addr, dialOptions) + conn, err := client.Dial(ctx, addr, client.WithGrpcOptions(dialOptions)) require.NoError(tb, err) return conn } diff --git a/internal/git/remoterepo/repository.go b/internal/git/remoterepo/repository.go index 4b1f2eb8b..e84081ece 100644 --- a/internal/git/remoterepo/repository.go +++ b/internal/git/remoterepo/repository.go @@ -5,9 +5,9 @@ import ( "fmt" "sync" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc" ) diff --git a/internal/git/remoterepo/repository_test.go b/internal/git/remoterepo/repository_test.go index 21fcab898..05eaaee34 100644 --- a/internal/git/remoterepo/repository_test.go +++ b/internal/git/remoterepo/repository_test.go @@ -9,11 +9,11 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -53,7 +53,7 @@ func TestRepository_ObjectHash(t *testing.T) { ctx = metadata.OutgoingToIncoming(ctx) pool := client.NewPool() - defer pool.Close() + defer testhelper.MustClose(t, pool) type setupData struct { repo *remoterepo.Repo diff --git a/internal/gitaly/gitalyclient/receive_pack.go b/internal/gitaly/gitalyclient/receive_pack.go new file mode 100644 index 000000000..0610f9e10 --- /dev/null +++ b/internal/gitaly/gitalyclient/receive_pack.go @@ -0,0 +1,42 @@ +package gitalyclient + +import ( + "context" + "io" + + "gitlab.com/gitlab-org/gitaly/v16/internal/stream" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/v16/streamio" + "google.golang.org/grpc" +) + +// ReceivePack proxies an SSH git-receive-pack (git push) session to Gitaly +func ReceivePack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHReceivePackRequest) (int32, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ssh := gitalypb.NewSSHServiceClient(conn) + receivePackStream, err := ssh.SSHReceivePack(ctx) + if err != nil { + return 0, err + } + + if err = receivePackStream.Send(req); err != nil { + return 0, err + } + + inWriter := streamio.NewWriter(func(p []byte) error { + return receivePackStream.Send(&gitalypb.SSHReceivePackRequest{Stdin: p}) + }) + + return stream.Handler(func() (stream.StdoutStderrResponse, error) { + return receivePackStream.Recv() + }, func(errC chan error) { + _, errRecv := io.Copy(inWriter, stdin) + if err := receivePackStream.CloseSend(); err != nil && errRecv == nil { + errC <- err + } else { + errC <- errRecv + } + }, stdout, stderr) +} diff --git a/internal/gitaly/gitalyclient/upload_archive.go b/internal/gitaly/gitalyclient/upload_archive.go new file mode 100644 index 000000000..007d3c880 --- /dev/null +++ b/internal/gitaly/gitalyclient/upload_archive.go @@ -0,0 +1,42 @@ +package gitalyclient + +import ( + "context" + "io" + + "gitlab.com/gitlab-org/gitaly/v16/internal/stream" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/v16/streamio" + "google.golang.org/grpc" +) + +// UploadArchive proxies an SSH git-upload-archive (git archive --remote) session to Gitaly +func UploadArchive(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadArchiveRequest) (int32, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ssh := gitalypb.NewSSHServiceClient(conn) + uploadPackStream, err := ssh.SSHUploadArchive(ctx) + if err != nil { + return 0, err + } + + if err = uploadPackStream.Send(req); err != nil { + return 0, err + } + + inWriter := streamio.NewWriter(func(p []byte) error { + return uploadPackStream.Send(&gitalypb.SSHUploadArchiveRequest{Stdin: p}) + }) + + return stream.Handler(func() (stream.StdoutStderrResponse, error) { + return uploadPackStream.Recv() + }, func(errC chan error) { + _, errRecv := io.Copy(inWriter, stdin) + if err := uploadPackStream.CloseSend(); err != nil && errRecv == nil { + errC <- err + } else { + errC <- errRecv + } + }, stdout, stderr) +} diff --git a/internal/gitaly/gitalyclient/upload_pack.go b/internal/gitaly/gitalyclient/upload_pack.go new file mode 100644 index 000000000..bd4f697c8 --- /dev/null +++ b/internal/gitaly/gitalyclient/upload_pack.go @@ -0,0 +1,86 @@ +package gitalyclient + +import ( + "context" + "io" + + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/stream" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/v16/streamio" + "google.golang.org/grpc" +) + +// UploadPack proxies an SSH git-upload-pack (git fetch) session to Gitaly +func UploadPack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadPackRequest) (int32, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ssh := gitalypb.NewSSHServiceClient(conn) + uploadPackStream, err := ssh.SSHUploadPack(ctx) + if err != nil { + return 0, err + } + + if err = uploadPackStream.Send(req); err != nil { + return 0, err + } + + inWriter := streamio.NewWriter(func(p []byte) error { + return uploadPackStream.Send(&gitalypb.SSHUploadPackRequest{Stdin: p}) + }) + + return stream.Handler(func() (stream.StdoutStderrResponse, error) { + return uploadPackStream.Recv() + }, func(errC chan error) { + _, errRecv := io.Copy(inWriter, stdin) + if err := uploadPackStream.CloseSend(); err != nil && errRecv == nil { + errC <- err + } else { + errC <- errRecv + } + }, stdout, stderr) +} + +// UploadPackResult wraps ExitCode and PackfileNegotiationStatistics. +type UploadPackResult struct { + ExitCode int32 + PackfileNegotiationStatistics *gitalypb.PackfileNegotiationStatistics +} + +// UploadPackWithSidechannel proxies an SSH git-upload-pack (git fetch) session to Gitaly using a sidechannel for the +// raw data transfer. +func UploadPackWithSidechannel( + ctx context.Context, + conn *grpc.ClientConn, + reg *sidechannel.Registry, + stdin io.Reader, + stdout, stderr io.Writer, + req *gitalypb.SSHUploadPackWithSidechannelRequest, +) (UploadPackResult, error) { + result := UploadPackResult{} + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ctx, wt := sidechannel.RegisterSidechannel(ctx, reg, func(c *sidechannel.ClientConn) error { + return stream.ProxyPktLine(c, stdin, stdout, stderr) + }) + defer func() { + // We aleady check the error further down. + _ = wt.Close() + }() + + sshClient := gitalypb.NewSSHServiceClient(conn) + resp, err := sshClient.SSHUploadPackWithSidechannel(ctx, req) + if err != nil { + return result, err + } + result.ExitCode = 0 + result.PackfileNegotiationStatistics = resp.PackfileNegotiationStatistics + + if err := wt.Close(); err != nil { + return result, err + } + + return result, nil +} diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 0c90233e9..4ca5b2584 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" @@ -26,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -188,7 +188,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { logger := testhelper.SharedLogger(t) registry := backchannel.NewRegistry() conns := client.NewPool() - t.Cleanup(func() { conns.Close() }) + t.Cleanup(func() { testhelper.MustClose(t, conns) }) locator := config.NewLocator(cfg) txManager := transaction.NewManager(cfg, registry) gitCmdFactory := gittest.NewCommandFactory(t, cfg) @@ -234,7 +234,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { } conns := client.NewPool() - t.Cleanup(func() { conns.Close() }) + t.Cleanup(func() { testhelper.MustClose(t, conns) }) srv, err := NewGitalyServerFactory( cfg, diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index 2ca530e98..f1d3ed929 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -11,11 +11,11 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "golang.org/x/sync/errgroup" @@ -65,7 +65,7 @@ func TestGitalyServerFactory(t *testing.T) { endpoint, err := starter.ComposeEndpoint(schema, listener.Addr().String()) require.NoError(t, err) - cc, err = client.Dial(endpoint, nil) + cc, err = client.Dial(ctx, endpoint) require.NoError(t, err) } t.Cleanup(func() { assert.NoError(t, cc.Close()) }) diff --git a/internal/gitaly/service/conflicts/server.go b/internal/gitaly/service/conflicts/server.go index cf9a86979..4490bf97a 100644 --- a/internal/gitaly/service/conflicts/server.go +++ b/internal/gitaly/service/conflicts/server.go @@ -3,7 +3,6 @@ package conflicts import ( "context" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" @@ -11,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index d0f568af3..e260a1465 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -1,7 +1,6 @@ package service import ( - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -16,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" diff --git a/internal/gitaly/service/operations/server.go b/internal/gitaly/service/operations/server.go index b96e1ab18..a3e09a5a7 100644 --- a/internal/gitaly/service/operations/server.go +++ b/internal/gitaly/service/operations/server.go @@ -3,7 +3,6 @@ package operations import ( "context" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" @@ -12,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) diff --git a/internal/gitaly/service/remote/server.go b/internal/gitaly/service/remote/server.go index 1448cc106..1eb3dcf5d 100644 --- a/internal/gitaly/service/remote/server.go +++ b/internal/gitaly/service/remote/server.go @@ -1,12 +1,12 @@ package remote import ( - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index e3a0b406f..233dd9bdf 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -11,7 +11,6 @@ import ( "strings" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -21,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 761fe6d3b..9b677d7fd 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" @@ -26,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/text" @@ -851,7 +851,7 @@ func TestFetchInternalRemote_successful(t *testing.T) { getGitalySSHInvocationParams := listenGitalySSHCalls(t, localCfg) connsPool := client.NewPool() - defer connsPool.Close() + defer testhelper.MustClose(t, connsPool) // Use the `assert` package such that we can get information about why hooks have failed via // the hook logs in case it did fail unexpectedly. @@ -895,7 +895,7 @@ func TestFetchInternalRemote_failure(t *testing.T) { ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) connsPool := client.NewPool() - defer connsPool.Close() + defer testhelper.MustClose(t, connsPool) err := fetchInternalRemote(ctx, &transaction.MockManager{}, connsPool, repo, &gitalypb.Repository{ StorageName: repoProto.GetStorageName(), diff --git a/internal/gitaly/service/repository/server.go b/internal/gitaly/service/repository/server.go index 0e354b2d1..5a2df1fed 100644 --- a/internal/gitaly/service/repository/server.go +++ b/internal/gitaly/service/repository/server.go @@ -3,7 +3,6 @@ package repository import ( "context" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" @@ -14,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/unarycache" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" diff --git a/internal/gitaly/service/repository/server_test.go b/internal/gitaly/service/repository/server_test.go index 26778d181..8cbcb1296 100644 --- a/internal/gitaly/service/repository/server_test.go +++ b/internal/gitaly/service/repository/server_test.go @@ -4,8 +4,8 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "google.golang.org/grpc/metadata" ) @@ -13,7 +13,7 @@ import ( func TestGetConnectionByStorage(t *testing.T) { t.Parallel() connPool := client.NewPool() - defer connPool.Close() + defer testhelper.MustClose(t, connPool) s := server{conns: connPool} ctx := testhelper.Context(t) diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go index 16152d16c..6ce6e05e2 100644 --- a/internal/gitaly/transaction/manager.go +++ b/internal/gitaly/transaction/manager.go @@ -9,10 +9,9 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -67,10 +66,10 @@ type PoolManager struct { func NewManager(cfg config.Cfg, backchannels *backchannel.Registry) *PoolManager { return &PoolManager{ backchannels: backchannels, - conns: client.NewPoolWithOptions(client.WithDialOptions(append( + conns: client.NewPool(client.WithDialOptions(append( client.FailOnNonTempDialError(), - internalclient.UnaryInterceptor(), - internalclient.StreamInterceptor())..., + client.UnaryInterceptor(), + client.StreamInterceptor())..., )), votingDelayMetric: prometheus.NewHistogram( prometheus.HistogramOpts{ diff --git a/internal/grpc/client/dial.go b/internal/grpc/client/dial.go index 213605f15..a4f076c3a 100644 --- a/internal/grpc/client/dial.go +++ b/internal/grpc/client/dial.go @@ -16,6 +16,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/encoding/protojson" ) @@ -79,7 +80,7 @@ func WithHandshaker(handshaker Handshaker) DialOption { // `grpc.DialContext()`. func WithGrpcOptions(opts []grpc.DialOption) DialOption { return func(cfg *dialConfig) { - cfg.grpcOpts = opts + cfg.grpcOpts = append(cfg.grpcOpts, opts...) } } @@ -248,3 +249,29 @@ func defaultServiceConfig() string { return string(configJSON) } + +// FailOnNonTempDialError helps to identify if remote listener is ready to accept new connections. +func FailOnNonTempDialError() []grpc.DialOption { + return []grpc.DialOption{ + grpc.WithBlock(), + grpc.FailOnNonTempDialError(true), + } +} + +// HealthCheckDialer uses provided dialer as an actual dialer, but issues a health check request to the remote +// to verify the connection was set properly and could be used with no issues. +func HealthCheckDialer(base Dialer) Dialer { + return func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) { + cc, err := base(ctx, address, dialOptions) + if err != nil { + return nil, err + } + + if _, err := healthpb.NewHealthClient(cc).Check(ctx, &healthpb.HealthCheckRequest{}); err != nil { + _ = cc.Close() + return nil, err + } + + return cc, nil + } +} diff --git a/internal/grpc/client/pool.go b/internal/grpc/client/pool.go new file mode 100644 index 000000000..a79112dc8 --- /dev/null +++ b/internal/grpc/client/pool.go @@ -0,0 +1,128 @@ +package client + +import ( + "context" + "errors" + "fmt" + "sync" + + gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" + "google.golang.org/grpc" +) + +type poolConfig struct { + dialer Dialer + dialOptions []grpc.DialOption +} + +// PoolOption is an option that can be passed to NewPool. +type PoolOption func(*poolConfig) + +// Dialer is a dialer used to create new connections. +type Dialer func(ctx context.Context, address string, opts []grpc.DialOption) (*grpc.ClientConn, error) + +// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes. +func WithDialer(dialer Dialer) PoolOption { + return func(options *poolConfig) { + options.dialer = dialer + } +} + +// WithDialOptions sets gRPC options to use for the gRPC Dial call. +func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption { + return func(options *poolConfig) { + options.dialOptions = dialOptions + } +} + +type poolKey struct{ address, token string } + +// Pool is a pool of GRPC connections. Connections created by it are safe for +// concurrent use. +type Pool struct { + lock sync.RWMutex + conns map[poolKey]*grpc.ClientConn + dialer Dialer + dialOptions []grpc.DialOption +} + +// NewPool creates a new connection pool that's ready for use. +func NewPool(opts ...PoolOption) *Pool { + cfg := poolConfig{ + dialer: func(ctx context.Context, address string, opts []grpc.DialOption) (*grpc.ClientConn, error) { + return Dial(ctx, address, WithGrpcOptions(opts)) + }, + } + for _, opt := range opts { + opt(&cfg) + } + + return &Pool{ + conns: make(map[poolKey]*grpc.ClientConn), + dialer: cfg.dialer, + dialOptions: cfg.dialOptions, + } +} + +// Close closes all connections tracked by the connection pool. +func (p *Pool) Close() error { + p.lock.Lock() + defer p.lock.Unlock() + + var firstError error + for addr, conn := range p.conns { + if err := conn.Close(); err != nil && firstError == nil { + firstError = err + } + + delete(p.conns, addr) + } + + return firstError +} + +// Dial creates a new client connection in case no connection to the given +// address exists already or returns an already established connection. The +// returned address must not be `Close()`d. +func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) { + return p.getOrCreateConnection(ctx, address, token) +} + +func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) { + if address == "" { + return nil, errors.New("address is empty") + } + + key := poolKey{address: address, token: token} + + p.lock.RLock() + cc, ok := p.conns[key] + p.lock.RUnlock() + + if ok { + return cc, nil + } + + p.lock.Lock() + defer p.lock.Unlock() + + cc, ok = p.conns[key] + if ok { + return cc, nil + } + + opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1) + opts = append(opts, p.dialOptions...) + if token != "" { + opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) + } + + cc, err := p.dialer(ctx, address, opts) + if err != nil { + return nil, fmt.Errorf("could not dial source: %w", err) + } + + p.conns[key] = cc + + return cc, nil +} diff --git a/internal/grpc/proxy/handler_ext_test.go b/internal/grpc/proxy/handler_ext_test.go index bfa1236c5..e03945dc9 100644 --- a/internal/grpc/proxy/handler_ext_test.go +++ b/internal/grpc/proxy/handler_ext_test.go @@ -18,7 +18,7 @@ import ( "github.com/getsentry/sentry-go" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -511,7 +511,7 @@ func TestRegisterStreamHandlers(t *testing.T) { go testhelper.MustServe(t, server, listener) defer server.Stop() - conn, err := client.Dial("tcp://"+listener.Addr().String(), []grpc.DialOption{grpc.WithBlock()}) + conn, err := client.Dial(ctx, "tcp://"+listener.Addr().String(), client.WithGrpcOptions([]grpc.DialOption{grpc.WithBlock()})) require.NoError(t, err) defer conn.Close() client := grpc_testing.NewTestServiceClient(conn) diff --git a/internal/grpc/sidechannel/conn.go b/internal/grpc/sidechannel/conn.go index f77bbcbe3..e76496ea9 100644 --- a/internal/grpc/sidechannel/conn.go +++ b/internal/grpc/sidechannel/conn.go @@ -1,12 +1,16 @@ package sidechannel import ( + "context" "fmt" "io" "net" + "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/streamio" + "google.golang.org/grpc" ) // ServerConn and ClientConn implement an asymmetric framing protocol to @@ -179,3 +183,10 @@ func (cc *ClientConn) CloseWrite() error { return nil } + +// Dial configures the dialer to establish a Gitaly backchannel connection instead of a regular gRPC connection. It +// also injects sr as a sidechannel registry, so that Gitaly can establish sidechannels back to the client. +func Dial(ctx context.Context, registry *Registry, logger logrus.FieldLogger, rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, error) { + clientHandshaker := NewClientHandshaker(logger, registry) + return client.Dial(ctx, rawAddress, client.WithGrpcOptions(connOpts), client.WithHandshaker(clientHandshaker)) +} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index feee4a8c8..70837c5ad 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" @@ -24,6 +23,7 @@ import ( gconfig "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" gitaly_metadata "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" @@ -2158,9 +2158,9 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { gitalypb.RegisterOperationServiceServer(srv, operationServer) }, testserver.WithDiskCache(&mockDiskCache{}), testserver.WithDisablePraefect()) - conn, err := client.DialContext(ctx, addr, []grpc.DialOption{ + conn, err := client.Dial(ctx, addr, client.WithGrpcOptions([]grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), - }) + })) require.NoError(t, err) defer conn.Close() diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index d8905bf6f..0d29c752c 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -9,8 +9,8 @@ import ( "time" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/datastructure" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" @@ -458,6 +458,8 @@ func TestMgr_GetSyncedNode(t *testing.T) { } func TestNodeStatus_IsHealthy(t *testing.T) { + ctx := testhelper.Context(t) + checkNTimes := func(t *testing.T, ctx context.Context, ns *nodeStatus, n int) { for i := 0; i < n; i++ { _, err := ns.CheckHealth(ctx) @@ -470,12 +472,11 @@ func TestNodeStatus_IsHealthy(t *testing.T) { healthSrv := testhelper.NewServerWithHealth(t, socket) - clientConn, err := client.Dial(address, nil) + clientConn, err := client.Dial(ctx, address) require.NoError(t, err) defer func() { require.NoError(t, clientConn.Close()) }() node := config.Node{Storage: "gitaly-0", Address: address} - ctx := testhelper.Context(t) logger := testhelper.SharedLogger(t) latencyHistMock := &promtest.MockHistogramVec{} diff --git a/internal/praefect/nodes/ping.go b/internal/praefect/nodes/ping.go index 79b730336..fb15cfdec 100644 --- a/internal/praefect/nodes/ping.go +++ b/internal/praefect/nodes/ping.go @@ -9,8 +9,7 @@ import ( "sync" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc" @@ -73,15 +72,15 @@ func (p *Ping) Address() string { func (p *Ping) dial(ctx context.Context) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithBlock(), - internalclient.UnaryInterceptor(), - internalclient.StreamInterceptor(), + client.UnaryInterceptor(), + client.StreamInterceptor(), } if len(p.token) > 0 { opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.token))) } - return client.DialContext(ctx, p.address, opts) + return client.Dial(ctx, p.address, client.WithGrpcOptions(opts)) } func (p *Ping) healthCheck(ctx context.Context, cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) { diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index 3098e8735..02f59bf6a 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -7,8 +7,8 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" @@ -36,7 +36,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { defer srv.Stop() go testhelper.MustServe(t, srv, ln) - targetCC, err := client.Dial(ln.Addr().Network()+":"+ln.Addr().String(), nil) + targetCC, err := client.Dial(ctx, ln.Addr().Network()+":"+ln.Addr().String()) require.NoError(t, err) defer testhelper.MustClose(t, targetCC) diff --git a/internal/praefect/service/checks.go b/internal/praefect/service/checks.go index 1201d7177..bcec3354a 100644 --- a/internal/praefect/service/checks.go +++ b/internal/praefect/service/checks.go @@ -11,8 +11,7 @@ import ( migrate "github.com/rubenv/sql-migrate" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" @@ -263,14 +262,14 @@ func NewClockSyncCheck(clockDriftCheck func(ntpHost string, driftThreshold time. g.Go(func() error { opts := []grpc.DialOption{ grpc.WithBlock(), - internalclient.UnaryInterceptor(), - internalclient.StreamInterceptor(), + client.UnaryInterceptor(), + client.StreamInterceptor(), } if len(node.Token) > 0 { opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token))) } - cc, err := client.DialContext(ctx, node.Address, opts) + cc, err := client.Dial(ctx, node.Address, client.WithGrpcOptions(opts)) if err != nil { return fmt.Errorf("%s machine: %w", node.Address, err) } diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go index 3ab759937..f3954667d 100644 --- a/internal/praefect/testserver.go +++ b/internal/praefect/testserver.go @@ -9,9 +9,9 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/client" gitalycfgauth "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" @@ -110,7 +110,7 @@ func listenAvailPort(tb testing.TB) (net.Listener, int) { return listener, listener.Addr().(*net.TCPAddr).Port } -func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { +func dialLocalPort(tb testing.TB, ctx context.Context, port int, backend bool) *grpc.ClientConn { opts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()), @@ -124,8 +124,9 @@ func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { } cc, err := client.Dial( + ctx, fmt.Sprintf("tcp://localhost:%d", port), - opts, + client.WithGrpcOptions(opts), ) require.NoError(tb, err) @@ -275,7 +276,7 @@ func RunPraefectServer( replMgrDone := startProcessBacklog(ctx, replmgr) // dial client to praefect - cc := dialLocalPort(tb, port, false) + cc := dialLocalPort(tb, ctx, port, false) cleanup := func() { cc.Close() diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 8cc44245c..04b2ccbdb 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -30,7 +29,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" - internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" @@ -139,14 +138,14 @@ func (gs GitalyServer) Address() string { func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken string) { grpcOpts := []grpc.DialOption{ grpc.WithBlock(), - internalclient.UnaryInterceptor(), - internalclient.StreamInterceptor(), + client.UnaryInterceptor(), + client.StreamInterceptor(), } if authToken != "" { grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(authToken))) } - conn, err := client.DialContext(ctx, addr, grpcOpts) + conn, err := client.Dial(ctx, addr, client.WithGrpcOptions(grpcOpts)) require.NoError(tb, err) defer testhelper.MustClose(tb, conn) @@ -173,7 +172,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d } deps := gsd.createDependencies(tb, cfg) - tb.Cleanup(func() { gsd.conns.Close() }) + tb.Cleanup(func() { testhelper.MustClose(tb, gsd.conns) }) serverFactory := server.NewGitalyServerFactory( cfg, @@ -280,7 +279,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * } if gsd.conns == nil { - gsd.conns = client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()) + gsd.conns = client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor())) } if gsd.locator == nil { diff --git a/tools/test-boot/main.go b/tools/test-boot/main.go index d27e53856..a9739b00b 100644 --- a/tools/test-boot/main.go +++ b/tools/test-boot/main.go @@ -14,7 +14,7 @@ import ( "time" "github.com/urfave/cli/v2" - "gitlab.com/gitlab-org/gitaly/v16/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "google.golang.org/grpc" ) @@ -112,9 +112,9 @@ func spawnAndWait(ctx context.Context, gitalyBin, configPath, socketPath string) for i := 0; i < 300; i++ { ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) - conn, err := client.DialContext(ctx, "unix://"+socketPath, []grpc.DialOption{ + conn, err := client.Dial(ctx, "unix://"+socketPath, client.WithGrpcOptions([]grpc.DialOption{ grpc.WithBlock(), - }) + })) cancel() |