Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2023-09-08 16:36:29 +0300
committerToon Claes <toon@gitlab.com>2023-09-08 16:36:29 +0300
commite7e38b42d14bd849c49e24034f750fc210c226ae (patch)
tree63aeaceefcb49c1269ed2c463ac0c84ad382dbc6
parent5c6c63b811c45b73b6b18e74203493c5696ee549 (diff)
parent2b86a6e586f3ae1a223bf21193366272e72623c7 (diff)
Merge branch 'pks-client-split-external-and-internal' into 'master'
client: Strictly distinguish internal and public code See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6336 Merged-by: Toon Claes <toon@gitlab.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Approved-by: Toon Claes <toon@gitlab.com> Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r--.golangci.yml3
-rw-r--r--client/dial.go23
-rw-r--r--client/pool.go90
-rw-r--r--client/pool_options.go39
-rw-r--r--client/pool_test.go2
-rw-r--r--client/receive_pack.go31
-rw-r--r--client/upload_archive.go31
-rw-r--r--client/upload_pack.go62
-rw-r--r--cmd/gitaly-backup/create.go9
-rw-r--r--cmd/gitaly-backup/restore.go9
-rw-r--r--cmd/gitaly-hooks/hooks.go10
-rw-r--r--cmd/gitaly-ssh/main.go20
-rw-r--r--cmd/gitaly-ssh/main_test.go8
-rw-r--r--cmd/gitaly-ssh/receive_pack.go7
-rw-r--r--cmd/gitaly-ssh/upload_archive.go7
-rw-r--r--cmd/gitaly-ssh/upload_pack.go16
-rw-r--r--internal/backup/backup.go2
-rw-r--r--internal/backup/backup_test.go6
-rw-r--r--internal/backup/server_side.go2
-rw-r--r--internal/backup/server_side_test.go2
-rw-r--r--internal/cli/gitaly/serve.go19
-rw-r--r--internal/cli/gitaly/subcmd_hooks.go9
-rw-r--r--internal/cli/gitaly/subcmd_hooks_test.go12
-rw-r--r--internal/cli/praefect/subcmd.go9
-rw-r--r--internal/cli/praefect/subcmd_list_untracked_repositories_test.go8
-rw-r--r--internal/cli/praefect/subcmd_remove_repository_test.go7
-rw-r--r--internal/cli/praefect/subcmd_track_repositories_test.go8
-rw-r--r--internal/cli/praefect/subcmd_track_repository_test.go8
-rw-r--r--internal/git/gittest/repo.go7
-rw-r--r--internal/git/remoterepo/repository.go2
-rw-r--r--internal/git/remoterepo/repository_test.go4
-rw-r--r--internal/gitaly/gitalyclient/receive_pack.go42
-rw-r--r--internal/gitaly/gitalyclient/upload_archive.go42
-rw-r--r--internal/gitaly/gitalyclient/upload_pack.go86
-rw-r--r--internal/gitaly/server/auth_test.go6
-rw-r--r--internal/gitaly/server/server_factory_test.go4
-rw-r--r--internal/gitaly/service/conflicts/server.go2
-rw-r--r--internal/gitaly/service/dependencies.go2
-rw-r--r--internal/gitaly/service/operations/server.go2
-rw-r--r--internal/gitaly/service/remote/server.go2
-rw-r--r--internal/gitaly/service/repository/replicate.go2
-rw-r--r--internal/gitaly/service/repository/replicate_test.go6
-rw-r--r--internal/gitaly/service/repository/server.go2
-rw-r--r--internal/gitaly/service/repository/server_test.go4
-rw-r--r--internal/gitaly/transaction/manager.go9
-rw-r--r--internal/grpc/client/dial.go29
-rw-r--r--internal/grpc/client/pool.go128
-rw-r--r--internal/grpc/proxy/handler_ext_test.go4
-rw-r--r--internal/grpc/sidechannel/conn.go11
-rw-r--r--internal/praefect/coordinator_test.go6
-rw-r--r--internal/praefect/nodes/manager_test.go7
-rw-r--r--internal/praefect/nodes/ping.go9
-rw-r--r--internal/praefect/replicator_pg_test.go4
-rw-r--r--internal/praefect/service/checks.go9
-rw-r--r--internal/praefect/testserver.go9
-rw-r--r--internal/testhelper/testserver/gitaly.go13
-rw-r--r--tools/test-boot/main.go6
57 files changed, 529 insertions, 389 deletions
diff --git a/.golangci.yml b/.golangci.yml
index c1346546c..825be0e44 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -50,6 +50,8 @@ linters-settings:
desc: "ioutil is deprecated starting with Go 1.16"
- pkg: "gitlab.com/gitlab-org/labkit/log"
desc: "use internal/log instead"
+ - pkg: "gitlab.com/gitlab-org/gitaly/v16/client"
+ desc: "use internal/grpc/client instead"
errcheck:
# The following are functions for which we are currently not consistently
# checking returned errors. This is not intended as a list of known-okay
@@ -58,7 +60,6 @@ linters-settings:
exclude-functions:
- (*database/sql.DB).Close
- (*database/sql.Rows).Close
- - (*gitlab.com/gitlab-org/gitaly/v16/client.Pool).Close
- (*gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel.ServerConn).Close
- (*gitlab.com/gitlab-org/gitaly/v16/internal/streamcache.pipe).Close
- (*gitlab.com/gitlab-org/gitaly/v16/internal/streamcache.pipeReader).Close
diff --git a/client/dial.go b/client/dial.go
index beaa7489a..3bc103b4c 100644
--- a/client/dial.go
+++ b/client/dial.go
@@ -12,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/dnsresolver"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"google.golang.org/grpc"
- healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// DefaultDialOpts hold the default DialOptions for connection to Gitaly over UNIX-socket
@@ -42,34 +41,18 @@ func Dial(rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, erro
// injects sr as a sidechannel registry, so that Gitaly can establish
// sidechannels back to the client.
func DialSidechannel(ctx context.Context, rawAddress string, sr *SidechannelRegistry, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
- clientHandshaker := sidechannel.NewClientHandshaker(sr.logger, sr.registry)
- return client.Dial(ctx, rawAddress, client.WithGrpcOptions(connOpts), client.WithHandshaker(clientHandshaker))
+ return sidechannel.Dial(ctx, sr.registry, sr.logger, rawAddress, connOpts)
}
// FailOnNonTempDialError helps to identify if remote listener is ready to accept new connections.
func FailOnNonTempDialError() []grpc.DialOption {
- return []grpc.DialOption{
- grpc.WithBlock(),
- grpc.FailOnNonTempDialError(true),
- }
+ return client.FailOnNonTempDialError()
}
// HealthCheckDialer uses provided dialer as an actual dialer, but issues a health check request to the remote
// to verify the connection was set properly and could be used with no issues.
func HealthCheckDialer(base Dialer) Dialer {
- return func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) {
- cc, err := base(ctx, address, dialOptions)
- if err != nil {
- return nil, err
- }
-
- if _, err := healthpb.NewHealthClient(cc).Check(ctx, &healthpb.HealthCheckRequest{}); err != nil {
- _ = cc.Close()
- return nil, err
- }
-
- return cc, nil
- }
+ return Dialer(client.HealthCheckDialer(client.Dialer(base)))
}
// DNSResolverBuilderConfig exposes the DNS resolver builder option. It is used to build Gitaly
diff --git a/client/pool.go b/client/pool.go
index ec430f73e..1354f726c 100644
--- a/client/pool.go
+++ b/client/pool.go
@@ -2,26 +2,30 @@ package client
import (
"context"
- "errors"
- "fmt"
- "sync"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"google.golang.org/grpc"
)
+// PoolOption is an option that can be passed to NewPoolWithOptions.
+type PoolOption = client.PoolOption
+
// Dialer is used by the Pool to create a *grpc.ClientConn.
type Dialer func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error)
-type poolKey struct{ address, token string }
+// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes.
+func WithDialer(dialer Dialer) PoolOption {
+ return client.WithDialer(client.Dialer(dialer))
+}
+
+// WithDialOptions sets gRPC options to use for the gRPC Dial call.
+func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption {
+ return client.WithDialOptions(dialOptions...)
+}
-// Pool is a pool of GRPC connections. Connections created by it are safe for
-// concurrent use.
+// Pool is a pool of GRPC connections. Connections created by it are safe for concurrent use.
type Pool struct {
- lock sync.RWMutex
- conns map[poolKey]*grpc.ClientConn
- dialer Dialer
- dialOptions []grpc.DialOption
+ pool *client.Pool
}
// NewPool creates a new connection pool that's ready for use.
@@ -31,73 +35,19 @@ func NewPool(dialOptions ...grpc.DialOption) *Pool {
// NewPoolWithOptions creates a new connection pool that's ready for use.
func NewPoolWithOptions(poolOptions ...PoolOption) *Pool {
- opts := applyPoolOptions(poolOptions)
return &Pool{
- conns: make(map[poolKey]*grpc.ClientConn),
- dialer: opts.dialer,
- dialOptions: opts.dialOptions,
- }
-}
-
-// Close closes all connections tracked by the connection pool.
-func (p *Pool) Close() error {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- var firstError error
- for addr, conn := range p.conns {
- if err := conn.Close(); err != nil && firstError == nil {
- firstError = err
- }
-
- delete(p.conns, addr)
+ pool: client.NewPool(poolOptions...),
}
-
- return firstError
}
// Dial creates a new client connection in case no connection to the given
// address exists already or returns an already established connection. The
// returned address must not be `Close()`d.
func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
- return p.getOrCreateConnection(ctx, address, token)
+ return p.pool.Dial(ctx, address, token)
}
-func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
- if address == "" {
- return nil, errors.New("address is empty")
- }
-
- key := poolKey{address: address, token: token}
-
- p.lock.RLock()
- cc, ok := p.conns[key]
- p.lock.RUnlock()
-
- if ok {
- return cc, nil
- }
-
- p.lock.Lock()
- defer p.lock.Unlock()
-
- cc, ok = p.conns[key]
- if ok {
- return cc, nil
- }
-
- opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1)
- opts = append(opts, p.dialOptions...)
- if token != "" {
- opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
- }
-
- cc, err := p.dialer(ctx, address, opts)
- if err != nil {
- return nil, fmt.Errorf("could not dial source: %w", err)
- }
-
- p.conns[key] = cc
-
- return cc, nil
+// Close closes all connections tracked by the connection pool.
+func (p *Pool) Close() error {
+ return p.pool.Close()
}
diff --git a/client/pool_options.go b/client/pool_options.go
deleted file mode 100644
index a8139c1f2..000000000
--- a/client/pool_options.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package client
-
-import "google.golang.org/grpc"
-
-type poolOptions struct {
- dialer Dialer
- dialOptions []grpc.DialOption
-}
-
-//nolint:revive // This is unintentionally missing documentation.
-type PoolOption func(*poolOptions)
-
-func applyPoolOptions(options []PoolOption) *poolOptions {
- opts := defaultPoolOptions()
- for _, opt := range options {
- opt(opts)
- }
- return opts
-}
-
-func defaultPoolOptions() *poolOptions {
- return &poolOptions{
- dialer: DialContext,
- }
-}
-
-// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes.
-func WithDialer(dialer Dialer) PoolOption {
- return func(options *poolOptions) {
- options.dialer = dialer
- }
-}
-
-// WithDialOptions sets gRPC options to use for the gRPC Dial call.
-func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption {
- return func(options *poolOptions) {
- options.dialOptions = dialOptions
- }
-}
diff --git a/client/pool_test.go b/client/pool_test.go
index af29f4bb7..262fa85a5 100644
--- a/client/pool_test.go
+++ b/client/pool_test.go
@@ -230,7 +230,7 @@ func TestPool_Dial_same_addr_another_token(t *testing.T) {
defer func() { stop1() }()
pool := NewPool()
- defer pool.Close()
+ defer testhelper.MustClose(t, pool)
// all good - server is running and serving requests
conn, err := pool.Dial(ctx, addr, "")
diff --git a/client/receive_pack.go b/client/receive_pack.go
index c59f87d06..2d8d68e44 100644
--- a/client/receive_pack.go
+++ b/client/receive_pack.go
@@ -4,39 +4,12 @@ import (
"context"
"io"
- "gitlab.com/gitlab-org/gitaly/v16/internal/stream"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
- "gitlab.com/gitlab-org/gitaly/v16/streamio"
"google.golang.org/grpc"
)
// ReceivePack proxies an SSH git-receive-pack (git push) session to Gitaly
func ReceivePack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHReceivePackRequest) (int32, error) {
- ctx2, cancel := context.WithCancel(ctx)
- defer cancel()
-
- ssh := gitalypb.NewSSHServiceClient(conn)
- receivePackStream, err := ssh.SSHReceivePack(ctx2)
- if err != nil {
- return 0, err
- }
-
- if err = receivePackStream.Send(req); err != nil {
- return 0, err
- }
-
- inWriter := streamio.NewWriter(func(p []byte) error {
- return receivePackStream.Send(&gitalypb.SSHReceivePackRequest{Stdin: p})
- })
-
- return stream.Handler(func() (stream.StdoutStderrResponse, error) {
- return receivePackStream.Recv()
- }, func(errC chan error) {
- _, errRecv := io.Copy(inWriter, stdin)
- if err := receivePackStream.CloseSend(); err != nil && errRecv == nil {
- errC <- err
- } else {
- errC <- errRecv
- }
- }, stdout, stderr)
+ return gitalyclient.ReceivePack(ctx, conn, stdin, stdout, stderr, req)
}
diff --git a/client/upload_archive.go b/client/upload_archive.go
index 654e7c583..572e6bc9b 100644
--- a/client/upload_archive.go
+++ b/client/upload_archive.go
@@ -4,39 +4,12 @@ import (
"context"
"io"
- "gitlab.com/gitlab-org/gitaly/v16/internal/stream"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
- "gitlab.com/gitlab-org/gitaly/v16/streamio"
"google.golang.org/grpc"
)
// UploadArchive proxies an SSH git-upload-archive (git archive --remote) session to Gitaly
func UploadArchive(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadArchiveRequest) (int32, error) {
- ctx2, cancel := context.WithCancel(ctx)
- defer cancel()
-
- ssh := gitalypb.NewSSHServiceClient(conn)
- uploadPackStream, err := ssh.SSHUploadArchive(ctx2)
- if err != nil {
- return 0, err
- }
-
- if err = uploadPackStream.Send(req); err != nil {
- return 0, err
- }
-
- inWriter := streamio.NewWriter(func(p []byte) error {
- return uploadPackStream.Send(&gitalypb.SSHUploadArchiveRequest{Stdin: p})
- })
-
- return stream.Handler(func() (stream.StdoutStderrResponse, error) {
- return uploadPackStream.Recv()
- }, func(errC chan error) {
- _, errRecv := io.Copy(inWriter, stdin)
- if err := uploadPackStream.CloseSend(); err != nil && errRecv == nil {
- errC <- err
- } else {
- errC <- errRecv
- }
- }, stdout, stderr)
+ return gitalyclient.UploadArchive(ctx, conn, stdin, stdout, stderr, req)
}
diff --git a/client/upload_pack.go b/client/upload_pack.go
index 65d81746e..4ad17dcb4 100644
--- a/client/upload_pack.go
+++ b/client/upload_pack.go
@@ -4,48 +4,18 @@ import (
"context"
"io"
- "gitlab.com/gitlab-org/gitaly/v16/internal/stream"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
- "gitlab.com/gitlab-org/gitaly/v16/streamio"
"google.golang.org/grpc"
)
// UploadPack proxies an SSH git-upload-pack (git fetch) session to Gitaly
func UploadPack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadPackRequest) (int32, error) {
- ctx2, cancel := context.WithCancel(ctx)
- defer cancel()
-
- ssh := gitalypb.NewSSHServiceClient(conn)
- uploadPackStream, err := ssh.SSHUploadPack(ctx2)
- if err != nil {
- return 0, err
- }
-
- if err = uploadPackStream.Send(req); err != nil {
- return 0, err
- }
-
- inWriter := streamio.NewWriter(func(p []byte) error {
- return uploadPackStream.Send(&gitalypb.SSHUploadPackRequest{Stdin: p})
- })
-
- return stream.Handler(func() (stream.StdoutStderrResponse, error) {
- return uploadPackStream.Recv()
- }, func(errC chan error) {
- _, errRecv := io.Copy(inWriter, stdin)
- if err := uploadPackStream.CloseSend(); err != nil && errRecv == nil {
- errC <- err
- } else {
- errC <- errRecv
- }
- }, stdout, stderr)
+ return gitalyclient.UploadPack(ctx, conn, stdin, stdout, stderr, req)
}
// UploadPackResult wraps ExitCode and PackfileNegotiationStatistics.
-type UploadPackResult struct {
- ExitCode int32
- PackfileNegotiationStatistics *gitalypb.PackfileNegotiationStatistics
-}
+type UploadPackResult = gitalyclient.UploadPackResult
// UploadPackWithSidechannelWithResult proxies an SSH git-upload-pack (git fetch)
// session to Gitaly using a sidechannel for the raw data transfer.
@@ -57,31 +27,7 @@ func UploadPackWithSidechannelWithResult(
stdout, stderr io.Writer,
req *gitalypb.SSHUploadPackWithSidechannelRequest,
) (UploadPackResult, error) {
- result := UploadPackResult{}
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
-
- ctx, wt := reg.Register(ctx, func(c SidechannelConn) error {
- return stream.ProxyPktLine(c, stdin, stdout, stderr)
- })
- defer func() {
- // We aleady check the error further down.
- _ = wt.Close()
- }()
-
- sshClient := gitalypb.NewSSHServiceClient(conn)
- resp, err := sshClient.SSHUploadPackWithSidechannel(ctx, req)
- if err != nil {
- return result, err
- }
- result.ExitCode = 0
- result.PackfileNegotiationStatistics = resp.PackfileNegotiationStatistics
-
- if err := wt.Close(); err != nil {
- return result, err
- }
-
- return result, nil
+ return gitalyclient.UploadPackWithSidechannel(ctx, conn, reg.registry, stdin, stdout, stderr, req)
}
// UploadPackWithSidechannel proxies an SSH git-upload-pack (git fetch)
diff --git a/cmd/gitaly-backup/create.go b/cmd/gitaly-backup/create.go
index 010f1055a..f2fb543af 100644
--- a/cmd/gitaly-backup/create.go
+++ b/cmd/gitaly-backup/create.go
@@ -10,10 +10,9 @@ import (
"time"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
@@ -45,8 +44,10 @@ func (cmd *createSubcommand) Flags(fs *flag.FlagSet) {
}
func (cmd *createSubcommand) Run(ctx context.Context, logger logrus.FieldLogger, stdin io.Reader, stdout io.Writer) error {
- pool := client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor())
- defer pool.Close()
+ pool := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor()))
+ defer func() {
+ _ = pool.Close()
+ }()
var manager backup.Strategy
if cmd.serverSide {
diff --git a/cmd/gitaly-backup/restore.go b/cmd/gitaly-backup/restore.go
index c5a5c8fc5..d2d0670fc 100644
--- a/cmd/gitaly-backup/restore.go
+++ b/cmd/gitaly-backup/restore.go
@@ -11,10 +11,9 @@ import (
"strings"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
@@ -50,8 +49,10 @@ func (cmd *restoreSubcommand) Flags(fs *flag.FlagSet) {
}
func (cmd *restoreSubcommand) Run(ctx context.Context, logger logrus.FieldLogger, stdin io.Reader, stdout io.Writer) error {
- pool := client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor())
- defer pool.Close()
+ pool := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor()))
+ defer func() {
+ _ = pool.Close()
+ }()
var manager backup.Strategy
if cmd.serverSide {
diff --git a/cmd/gitaly-hooks/hooks.go b/cmd/gitaly-hooks/hooks.go
index a19873cf0..d8bd931fb 100644
--- a/cmd/gitaly-hooks/hooks.go
+++ b/cmd/gitaly-hooks/hooks.go
@@ -12,10 +12,10 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/sirupsen/logrus"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/env"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
@@ -175,7 +175,7 @@ func executeHook(ctx context.Context, cmd hookCommand, args []string) error {
ctx = injectMetadataIntoOutgoingCtx(ctx, payload)
- conn, err := dialGitaly(payload)
+ conn, err := dialGitaly(ctx, payload)
if err != nil {
return fmt.Errorf("error when connecting to gitaly: %w", err)
}
@@ -207,8 +207,8 @@ func injectMetadataIntoOutgoingCtx(ctx context.Context, payload git.HooksPayload
func noopSender(c chan error) {}
-func dialGitaly(payload git.HooksPayload) (*grpc.ClientConn, error) {
- dialOpts := client.DefaultDialOpts
+func dialGitaly(ctx context.Context, payload git.HooksPayload) (*grpc.ClientConn, error) {
+ var dialOpts []grpc.DialOption
if payload.InternalSocketToken != "" {
dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(payload.InternalSocketToken)))
}
@@ -234,7 +234,7 @@ func dialGitaly(payload git.HooksPayload) (*grpc.ClientConn, error) {
dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unaryInterceptors...)))
dialOpts = append(dialOpts, grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(streamInterceptors...)))
- conn, err := client.Dial("unix://"+payload.InternalSocket, dialOpts)
+ conn, err := client.Dial(ctx, "unix://"+payload.InternalSocket, client.WithGrpcOptions(dialOpts))
if err != nil {
return nil, fmt.Errorf("error when dialing: %w", err)
}
diff --git a/cmd/gitaly-ssh/main.go b/cmd/gitaly-ssh/main.go
index 121186378..5b1bf262f 100644
--- a/cmd/gitaly-ssh/main.go
+++ b/cmd/gitaly-ssh/main.go
@@ -9,15 +9,15 @@ import (
"github.com/sirupsen/logrus"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/labkit/tracing"
"google.golang.org/grpc"
)
-type packFn func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error)
+type packFn func(context.Context, *grpc.ClientConn, *sidechannel.Registry, string) (int32, error)
type gitalySSHCommand struct {
// The git packer that shall be executed. One of receivePack,
@@ -117,8 +117,8 @@ func (cmd gitalySSHCommand) run(logger logrus.FieldLogger) (int, error) {
}
}
- registry := client.NewSidechannelRegistry(logger)
- conn, err := getConnection(ctx, cmd.address, registry)
+ registry := sidechannel.NewRegistry()
+ conn, err := getConnection(ctx, cmd.address, registry, logger)
if err != nil {
return 1, err
}
@@ -132,25 +132,25 @@ func (cmd gitalySSHCommand) run(logger logrus.FieldLogger) (int, error) {
return int(code), nil
}
-func getConnection(ctx context.Context, url string, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) {
+func getConnection(ctx context.Context, url string, registry *sidechannel.Registry, logger logrus.FieldLogger) (*grpc.ClientConn, error) {
if url == "" {
return nil, fmt.Errorf("gitaly address can not be empty")
}
if useSidechannel() {
- return client.DialSidechannel(ctx, url, registry, dialOpts())
+ return sidechannel.Dial(ctx, registry, logger, url, dialOpts())
}
- return client.DialContext(ctx, url, dialOpts())
+ return client.Dial(ctx, url, client.WithGrpcOptions(dialOpts()))
}
func dialOpts() []grpc.DialOption {
- connOpts := client.DefaultDialOpts
+ var connOpts []grpc.DialOption
if token := os.Getenv("GITALY_TOKEN"); token != "" {
connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
}
- return append(connOpts, internalclient.UnaryInterceptor(), internalclient.StreamInterceptor())
+ return append(connOpts, client.UnaryInterceptor(), client.StreamInterceptor())
}
func useSidechannel() bool { return os.Getenv("GITALY_USE_SIDECHANNEL") == "1" }
diff --git a/cmd/gitaly-ssh/main_test.go b/cmd/gitaly-ssh/main_test.go
index 258509e9c..640b7e5e6 100644
--- a/cmd/gitaly-ssh/main_test.go
+++ b/cmd/gitaly-ssh/main_test.go
@@ -7,19 +7,19 @@ import (
"testing"
"github.com/stretchr/testify/assert"
- "gitlab.com/gitlab-org/gitaly/v16/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"google.golang.org/grpc"
)
func TestRun(t *testing.T) {
- var successPacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) {
+ var successPacker packFn = func(context.Context, *grpc.ClientConn, *sidechannel.Registry, string) (int32, error) {
return 0, nil
}
- var exitCodePacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) {
+ var exitCodePacker packFn = func(context.Context, *grpc.ClientConn, *sidechannel.Registry, string) (int32, error) {
return 123, nil
}
- var errorPacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) {
+ var errorPacker packFn = func(context.Context, *grpc.ClientConn, *sidechannel.Registry, string) (int32, error) {
return 1, fmt.Errorf("fail")
}
diff --git a/cmd/gitaly-ssh/receive_pack.go b/cmd/gitaly-ssh/receive_pack.go
index f5cc8a51f..268e3d69f 100644
--- a/cmd/gitaly-ssh/receive_pack.go
+++ b/cmd/gitaly-ssh/receive_pack.go
@@ -5,13 +5,14 @@ import (
"fmt"
"os"
- "gitlab.com/gitlab-org/gitaly/v16/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
)
-func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
+func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.Registry, req string) (int32, error) {
var request gitalypb.SSHReceivePackRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
@@ -21,5 +22,5 @@ func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *client.Si
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- return client.ReceivePack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
+ return gitalyclient.ReceivePack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
}
diff --git a/cmd/gitaly-ssh/upload_archive.go b/cmd/gitaly-ssh/upload_archive.go
index de1d434f7..f92ffa965 100644
--- a/cmd/gitaly-ssh/upload_archive.go
+++ b/cmd/gitaly-ssh/upload_archive.go
@@ -5,13 +5,14 @@ import (
"fmt"
"os"
- "gitlab.com/gitlab-org/gitaly/v16/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
)
-func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
+func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.Registry, req string) (int32, error) {
var request gitalypb.SSHUploadArchiveRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
return 0, fmt.Errorf("json unmarshal: %w", err)
@@ -20,5 +21,5 @@ func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *client.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- return client.UploadArchive(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
+ return gitalyclient.UploadArchive(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
}
diff --git a/cmd/gitaly-ssh/upload_pack.go b/cmd/gitaly-ssh/upload_pack.go
index b4a9e0755..d300a3fc0 100644
--- a/cmd/gitaly-ssh/upload_pack.go
+++ b/cmd/gitaly-ssh/upload_pack.go
@@ -5,7 +5,8 @@ import (
"fmt"
"os"
- "gitlab.com/gitlab-org/gitaly/v16/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/gitalyclient"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
@@ -22,7 +23,7 @@ func uploadPackConfig(config []string) []string {
return append([]string{GitConfigShowAllRefs}, config...)
}
-func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
+func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.Registry, req string) (int32, error) {
var request gitalypb.SSHUploadPackRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
return 0, fmt.Errorf("json unmarshal: %w", err)
@@ -33,10 +34,10 @@ func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *client.Sid
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- return client.UploadPack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
+ return gitalyclient.UploadPack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
}
-func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
+func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.Registry, req string) (int32, error) {
var request gitalypb.SSHUploadPackWithSidechannelRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
return 0, fmt.Errorf("json unmarshal: %w", err)
@@ -47,5 +48,10 @@ func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, regis
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- return client.UploadPackWithSidechannel(ctx, conn, registry, os.Stdin, os.Stdout, os.Stderr, &request)
+ result, err := gitalyclient.UploadPackWithSidechannel(ctx, conn, registry, os.Stdin, os.Stdout, os.Stderr, &request)
+ if err != nil {
+ return 0, err
+ }
+
+ return result.ExitCode, nil
}
diff --git a/internal/backup/backup.go b/internal/backup/backup.go
index fd308abd2..267ec5f99 100644
--- a/internal/backup/backup.go
+++ b/internal/backup/backup.go
@@ -7,13 +7,13 @@ import (
"fmt"
"io"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go
index eb4fa2011..ac7b155c0 100644
--- a/internal/backup/backup_test.go
+++ b/internal/backup/backup_test.go
@@ -9,7 +9,6 @@ import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/archive"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
@@ -20,6 +19,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
@@ -430,7 +430,7 @@ func TestManager_Restore_latest(t *testing.T) {
ctx := testhelper.Context(t)
- cc, err := client.Dial(cfg.SocketPath, nil)
+ cc, err := client.Dial(ctx, cfg.SocketPath)
require.NoError(t, err)
defer testhelper.MustClose(t, cc)
@@ -773,7 +773,7 @@ func TestManager_Restore_specific(t *testing.T) {
ctx := testhelper.Context(t)
- cc, err := client.Dial(cfg.SocketPath, nil)
+ cc, err := client.Dial(ctx, cfg.SocketPath)
require.NoError(t, err)
defer testhelper.MustClose(t, cc)
diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go
index 1a2cf6b41..ecfb08d5d 100644
--- a/internal/backup/server_side.go
+++ b/internal/backup/server_side.go
@@ -4,8 +4,8 @@ import (
"context"
"fmt"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc/codes"
diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go
index 5d69b70b2..13d375d69 100644
--- a/internal/backup/server_side_test.go
+++ b/internal/backup/server_side_test.go
@@ -6,13 +6,13 @@ import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 989c175c3..9822b2ab4 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -13,7 +13,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"gitlab.com/gitlab-org/gitaly/v16"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap"
"gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap/starter"
@@ -36,7 +35,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitlab"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/env"
@@ -257,15 +256,21 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
hookManager = hm
}
- conns := client.NewPoolWithOptions(
- client.WithDialer(client.HealthCheckDialer(client.DialContext)),
+ conns := client.NewPool(
+ client.WithDialer(client.HealthCheckDialer(
+ func(ctx context.Context, address string, opts []grpc.DialOption) (*grpc.ClientConn, error) {
+ return client.Dial(ctx, address, client.WithGrpcOptions(opts))
+ },
+ )),
client.WithDialOptions(append(
client.FailOnNonTempDialError(),
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor())...,
+ client.UnaryInterceptor(),
+ client.StreamInterceptor())...,
),
)
- defer conns.Close()
+ defer func() {
+ _ = conns.Close()
+ }()
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
diff --git a/internal/cli/gitaly/subcmd_hooks.go b/internal/cli/gitaly/subcmd_hooks.go
index f0d0946d6..16b2e08c9 100644
--- a/internal/cli/gitaly/subcmd_hooks.go
+++ b/internal/cli/gitaly/subcmd_hooks.go
@@ -9,9 +9,8 @@ import (
"github.com/urfave/cli/v2"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v16/streamio"
@@ -151,8 +150,8 @@ func dial(ctx context.Context, addr, token string, timeout time.Duration, opts .
opts = append(opts,
grpc.WithBlock(),
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
+ client.UnaryInterceptor(),
+ client.StreamInterceptor(),
)
if len(token) > 0 {
@@ -163,7 +162,7 @@ func dial(ctx context.Context, addr, token string, timeout time.Duration, opts .
)
}
- return client.DialContext(ctx, addr, opts)
+ return client.Dial(ctx, addr, client.WithGrpcOptions(opts))
}
func getAddressWithScheme(cfg config.Cfg) (string, error) {
diff --git a/internal/cli/gitaly/subcmd_hooks_test.go b/internal/cli/gitaly/subcmd_hooks_test.go
index a3287c830..338a72c0e 100644
--- a/internal/cli/gitaly/subcmd_hooks_test.go
+++ b/internal/cli/gitaly/subcmd_hooks_test.go
@@ -2,6 +2,7 @@ package gitaly
import (
"bytes"
+ "context"
"io"
"io/fs"
"os/exec"
@@ -10,12 +11,11 @@ import (
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- gclient "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
@@ -41,7 +41,7 @@ func TestSetHooksSubcommand(t *testing.T) {
// The generated socket path already has the unix prefix. This needs to be
// removed because the Gitaly config does not expect a scheme to be present.
cfg.SocketPath = strings.TrimPrefix(serverSocketPath, "unix://")
- client := newRepositoryClient(t, cfg, serverSocketPath)
+ client := newRepositoryClient(t, ctx, cfg, serverSocketPath)
configPath := testcfg.WriteTemporaryGitalyConfigFile(t, cfg)
@@ -216,16 +216,16 @@ func TestSetHooksSubcommand(t *testing.T) {
}
}
-func newRepositoryClient(tb testing.TB, cfg config.Cfg, serverSocketPath string) gitalypb.RepositoryServiceClient {
+func newRepositoryClient(tb testing.TB, ctx context.Context, cfg config.Cfg, serverSocketPath string) gitalypb.RepositoryServiceClient {
tb.Helper()
connOpts := []grpc.DialOption{
- internalclient.UnaryInterceptor(), internalclient.StreamInterceptor(),
+ client.UnaryInterceptor(), client.StreamInterceptor(),
}
if cfg.Auth.Token != "" {
connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token)))
}
- conn, err := gclient.Dial(serverSocketPath, connOpts)
+ conn, err := client.Dial(ctx, serverSocketPath, client.WithGrpcOptions(connOpts))
require.NoError(tb, err)
tb.Cleanup(func() { require.NoError(tb, conn.Close()) })
diff --git a/internal/cli/praefect/subcmd.go b/internal/cli/praefect/subcmd.go
index aa7a2d623..bb171b2a3 100644
--- a/internal/cli/praefect/subcmd.go
+++ b/internal/cli/praefect/subcmd.go
@@ -9,8 +9,7 @@ import (
"time"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql"
"google.golang.org/grpc"
@@ -62,8 +61,8 @@ func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration,
opts = append(opts,
grpc.WithBlock(),
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
+ client.UnaryInterceptor(),
+ client.StreamInterceptor(),
)
if len(token) > 0 {
@@ -74,7 +73,7 @@ func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration,
)
}
- return client.DialContext(ctx, addr, opts)
+ return client.Dial(ctx, addr, client.WithGrpcOptions(opts))
}
type requiredParameterError string
diff --git a/internal/cli/praefect/subcmd_list_untracked_repositories_test.go b/internal/cli/praefect/subcmd_list_untracked_repositories_test.go
index 694f99040..560f22217 100644
--- a/internal/cli/praefect/subcmd_list_untracked_repositories_test.go
+++ b/internal/cli/praefect/subcmd_list_untracked_repositories_test.go
@@ -11,9 +11,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
@@ -24,6 +24,9 @@ import (
func TestListUntrackedRepositoriesCommand(t *testing.T) {
t.Parallel()
+
+ ctx := testhelper.Context(t)
+
g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
@@ -53,11 +56,10 @@ func TestListUntrackedRepositoriesCommand(t *testing.T) {
praefectServer := testserver.StartPraefect(t, conf)
- cc, err := client.Dial(praefectServer.Address(), nil)
+ cc, err := client.Dial(ctx, praefectServer.Address())
require.NoError(t, err)
defer func() { require.NoError(t, cc.Close()) }()
repoClient := gitalypb.NewRepositoryServiceClient(cc)
- ctx := testhelper.Context(t)
praefectStorage := conf.VirtualStorages[0].Name
diff --git a/internal/cli/praefect/subcmd_remove_repository_test.go b/internal/cli/praefect/subcmd_remove_repository_test.go
index 489524e36..faf277824 100644
--- a/internal/cli/praefect/subcmd_remove_repository_test.go
+++ b/internal/cli/praefect/subcmd_remove_repository_test.go
@@ -12,10 +12,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
@@ -30,6 +30,8 @@ import (
func TestRemoveRepositorySubcommand(t *testing.T) {
t.Parallel()
+ ctx := testhelper.Context(t)
+
g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
@@ -61,11 +63,10 @@ func TestRemoveRepositorySubcommand(t *testing.T) {
praefectServer := testserver.StartPraefect(t, conf)
- cc, err := client.Dial(praefectServer.Address(), nil)
+ cc, err := client.Dial(ctx, praefectServer.Address())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, cc.Close()) })
repoClient := gitalypb.NewRepositoryServiceClient(cc)
- ctx := testhelper.Context(t)
praefectStorage := conf.VirtualStorages[0].Name
diff --git a/internal/cli/praefect/subcmd_track_repositories_test.go b/internal/cli/praefect/subcmd_track_repositories_test.go
index d5371a32c..85af24ef2 100644
--- a/internal/cli/praefect/subcmd_track_repositories_test.go
+++ b/internal/cli/praefect/subcmd_track_repositories_test.go
@@ -11,8 +11,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
@@ -28,6 +28,9 @@ import (
func TestTrackRepositoriesSubcommand(t *testing.T) {
t.Parallel()
+
+ ctx := testhelper.Context(t)
+
g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
testcfg.BuildGitalyHooks(t, g2Cfg)
@@ -65,10 +68,9 @@ func TestTrackRepositoriesSubcommand(t *testing.T) {
}
confPath := writeConfigToFile(t, conf)
- gitalyCC, err := client.Dial(g1Addr, nil)
+ gitalyCC, err := client.Dial(ctx, g1Addr)
require.NoError(t, err)
defer testhelper.MustClose(t, gitalyCC)
- ctx := testhelper.Context(t)
gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC)
diff --git a/internal/cli/praefect/subcmd_track_repository_test.go b/internal/cli/praefect/subcmd_track_repository_test.go
index bd10b3411..f1b2e5308 100644
--- a/internal/cli/praefect/subcmd_track_repository_test.go
+++ b/internal/cli/praefect/subcmd_track_repository_test.go
@@ -7,8 +7,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
@@ -23,6 +23,9 @@ import (
func TestTrackRepositorySubcommand(t *testing.T) {
t.Parallel()
+
+ ctx := testhelper.Context(t)
+
g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
testcfg.BuildGitalyHooks(t, g2Cfg)
@@ -60,10 +63,9 @@ func TestTrackRepositorySubcommand(t *testing.T) {
}
confPath := writeConfigToFile(t, conf)
- gitalyCC, err := client.Dial(g1Addr, nil)
+ gitalyCC, err := client.Dial(ctx, g1Addr)
require.NoError(t, err)
defer func() { require.NoError(t, gitalyCC.Close()) }()
- ctx := testhelper.Context(t)
gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC)
diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go
index 68edeeecb..0e1878dbf 100644
--- a/internal/git/gittest/repo.go
+++ b/internal/git/gittest/repo.go
@@ -11,11 +11,10 @@ import (
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
@@ -97,7 +96,7 @@ type CreateRepositoryConfig struct {
func dialService(tb testing.TB, ctx context.Context, cfg config.Cfg) *grpc.ClientConn {
tb.Helper()
- dialOptions := []grpc.DialOption{internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()}
+ dialOptions := []grpc.DialOption{client.UnaryInterceptor(), client.StreamInterceptor()}
if cfg.Auth.Token != "" {
dialOptions = append(dialOptions, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token)))
}
@@ -114,7 +113,7 @@ func dialService(tb testing.TB, ctx context.Context, cfg config.Cfg) *grpc.Clien
require.FailNow(tb, "cannot dial service without configured address")
}
- conn, err := client.DialContext(ctx, addr, dialOptions)
+ conn, err := client.Dial(ctx, addr, client.WithGrpcOptions(dialOptions))
require.NoError(tb, err)
return conn
}
diff --git a/internal/git/remoterepo/repository.go b/internal/git/remoterepo/repository.go
index 4b1f2eb8b..e84081ece 100644
--- a/internal/git/remoterepo/repository.go
+++ b/internal/git/remoterepo/repository.go
@@ -5,9 +5,9 @@ import (
"fmt"
"sync"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc"
)
diff --git a/internal/git/remoterepo/repository_test.go b/internal/git/remoterepo/repository_test.go
index 21fcab898..05eaaee34 100644
--- a/internal/git/remoterepo/repository_test.go
+++ b/internal/git/remoterepo/repository_test.go
@@ -9,11 +9,11 @@ import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
@@ -53,7 +53,7 @@ func TestRepository_ObjectHash(t *testing.T) {
ctx = metadata.OutgoingToIncoming(ctx)
pool := client.NewPool()
- defer pool.Close()
+ defer testhelper.MustClose(t, pool)
type setupData struct {
repo *remoterepo.Repo
diff --git a/internal/gitaly/gitalyclient/receive_pack.go b/internal/gitaly/gitalyclient/receive_pack.go
new file mode 100644
index 000000000..0610f9e10
--- /dev/null
+++ b/internal/gitaly/gitalyclient/receive_pack.go
@@ -0,0 +1,42 @@
+package gitalyclient
+
+import (
+ "context"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/stream"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/v16/streamio"
+ "google.golang.org/grpc"
+)
+
+// ReceivePack proxies an SSH git-receive-pack (git push) session to Gitaly
+func ReceivePack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHReceivePackRequest) (int32, error) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ ssh := gitalypb.NewSSHServiceClient(conn)
+ receivePackStream, err := ssh.SSHReceivePack(ctx)
+ if err != nil {
+ return 0, err
+ }
+
+ if err = receivePackStream.Send(req); err != nil {
+ return 0, err
+ }
+
+ inWriter := streamio.NewWriter(func(p []byte) error {
+ return receivePackStream.Send(&gitalypb.SSHReceivePackRequest{Stdin: p})
+ })
+
+ return stream.Handler(func() (stream.StdoutStderrResponse, error) {
+ return receivePackStream.Recv()
+ }, func(errC chan error) {
+ _, errRecv := io.Copy(inWriter, stdin)
+ if err := receivePackStream.CloseSend(); err != nil && errRecv == nil {
+ errC <- err
+ } else {
+ errC <- errRecv
+ }
+ }, stdout, stderr)
+}
diff --git a/internal/gitaly/gitalyclient/upload_archive.go b/internal/gitaly/gitalyclient/upload_archive.go
new file mode 100644
index 000000000..007d3c880
--- /dev/null
+++ b/internal/gitaly/gitalyclient/upload_archive.go
@@ -0,0 +1,42 @@
+package gitalyclient
+
+import (
+ "context"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/stream"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/v16/streamio"
+ "google.golang.org/grpc"
+)
+
+// UploadArchive proxies an SSH git-upload-archive (git archive --remote) session to Gitaly
+func UploadArchive(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadArchiveRequest) (int32, error) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ ssh := gitalypb.NewSSHServiceClient(conn)
+ uploadPackStream, err := ssh.SSHUploadArchive(ctx)
+ if err != nil {
+ return 0, err
+ }
+
+ if err = uploadPackStream.Send(req); err != nil {
+ return 0, err
+ }
+
+ inWriter := streamio.NewWriter(func(p []byte) error {
+ return uploadPackStream.Send(&gitalypb.SSHUploadArchiveRequest{Stdin: p})
+ })
+
+ return stream.Handler(func() (stream.StdoutStderrResponse, error) {
+ return uploadPackStream.Recv()
+ }, func(errC chan error) {
+ _, errRecv := io.Copy(inWriter, stdin)
+ if err := uploadPackStream.CloseSend(); err != nil && errRecv == nil {
+ errC <- err
+ } else {
+ errC <- errRecv
+ }
+ }, stdout, stderr)
+}
diff --git a/internal/gitaly/gitalyclient/upload_pack.go b/internal/gitaly/gitalyclient/upload_pack.go
new file mode 100644
index 000000000..bd4f697c8
--- /dev/null
+++ b/internal/gitaly/gitalyclient/upload_pack.go
@@ -0,0 +1,86 @@
+package gitalyclient
+
+import (
+ "context"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/stream"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/v16/streamio"
+ "google.golang.org/grpc"
+)
+
+// UploadPack proxies an SSH git-upload-pack (git fetch) session to Gitaly
+func UploadPack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadPackRequest) (int32, error) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ ssh := gitalypb.NewSSHServiceClient(conn)
+ uploadPackStream, err := ssh.SSHUploadPack(ctx)
+ if err != nil {
+ return 0, err
+ }
+
+ if err = uploadPackStream.Send(req); err != nil {
+ return 0, err
+ }
+
+ inWriter := streamio.NewWriter(func(p []byte) error {
+ return uploadPackStream.Send(&gitalypb.SSHUploadPackRequest{Stdin: p})
+ })
+
+ return stream.Handler(func() (stream.StdoutStderrResponse, error) {
+ return uploadPackStream.Recv()
+ }, func(errC chan error) {
+ _, errRecv := io.Copy(inWriter, stdin)
+ if err := uploadPackStream.CloseSend(); err != nil && errRecv == nil {
+ errC <- err
+ } else {
+ errC <- errRecv
+ }
+ }, stdout, stderr)
+}
+
+// UploadPackResult wraps ExitCode and PackfileNegotiationStatistics.
+type UploadPackResult struct {
+ ExitCode int32
+ PackfileNegotiationStatistics *gitalypb.PackfileNegotiationStatistics
+}
+
+// UploadPackWithSidechannel proxies an SSH git-upload-pack (git fetch) session to Gitaly using a sidechannel for the
+// raw data transfer.
+func UploadPackWithSidechannel(
+ ctx context.Context,
+ conn *grpc.ClientConn,
+ reg *sidechannel.Registry,
+ stdin io.Reader,
+ stdout, stderr io.Writer,
+ req *gitalypb.SSHUploadPackWithSidechannelRequest,
+) (UploadPackResult, error) {
+ result := UploadPackResult{}
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ ctx, wt := sidechannel.RegisterSidechannel(ctx, reg, func(c *sidechannel.ClientConn) error {
+ return stream.ProxyPktLine(c, stdin, stdout, stderr)
+ })
+ defer func() {
+ // We aleady check the error further down.
+ _ = wt.Close()
+ }()
+
+ sshClient := gitalypb.NewSSHServiceClient(conn)
+ resp, err := sshClient.SSHUploadPackWithSidechannel(ctx, req)
+ if err != nil {
+ return result, err
+ }
+ result.ExitCode = 0
+ result.PackfileNegotiationStatistics = resp.PackfileNegotiationStatistics
+
+ if err := wt.Close(); err != nil {
+ return result, err
+ }
+
+ return result, nil
+}
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 0c90233e9..4ca5b2584 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
@@ -26,6 +25,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitlab"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
@@ -188,7 +188,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
logger := testhelper.SharedLogger(t)
registry := backchannel.NewRegistry()
conns := client.NewPool()
- t.Cleanup(func() { conns.Close() })
+ t.Cleanup(func() { testhelper.MustClose(t, conns) })
locator := config.NewLocator(cfg)
txManager := transaction.NewManager(cfg, registry)
gitCmdFactory := gittest.NewCommandFactory(t, cfg)
@@ -234,7 +234,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
}
conns := client.NewPool()
- t.Cleanup(func() { conns.Close() })
+ t.Cleanup(func() { testhelper.MustClose(t, conns) })
srv, err := NewGitalyServerFactory(
cfg,
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go
index 2ca530e98..f1d3ed929 100644
--- a/internal/gitaly/server/server_factory_test.go
+++ b/internal/gitaly/server/server_factory_test.go
@@ -11,11 +11,11 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/v16/internal/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
"golang.org/x/sync/errgroup"
@@ -65,7 +65,7 @@ func TestGitalyServerFactory(t *testing.T) {
endpoint, err := starter.ComposeEndpoint(schema, listener.Addr().String())
require.NoError(t, err)
- cc, err = client.Dial(endpoint, nil)
+ cc, err = client.Dial(ctx, endpoint)
require.NoError(t, err)
}
t.Cleanup(func() { assert.NoError(t, cc.Close()) })
diff --git a/internal/gitaly/service/conflicts/server.go b/internal/gitaly/service/conflicts/server.go
index cf9a86979..4490bf97a 100644
--- a/internal/gitaly/service/conflicts/server.go
+++ b/internal/gitaly/service/conflicts/server.go
@@ -3,7 +3,6 @@ package conflicts
import (
"context"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
@@ -11,6 +10,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go
index d0f568af3..e260a1465 100644
--- a/internal/gitaly/service/dependencies.go
+++ b/internal/gitaly/service/dependencies.go
@@ -1,7 +1,6 @@
package service
import (
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
@@ -16,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitlab"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
diff --git a/internal/gitaly/service/operations/server.go b/internal/gitaly/service/operations/server.go
index b96e1ab18..a3e09a5a7 100644
--- a/internal/gitaly/service/operations/server.go
+++ b/internal/gitaly/service/operations/server.go
@@ -3,7 +3,6 @@ package operations
import (
"context"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
@@ -12,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
diff --git a/internal/gitaly/service/remote/server.go b/internal/gitaly/service/remote/server.go
index 1448cc106..1eb3dcf5d 100644
--- a/internal/gitaly/service/remote/server.go
+++ b/internal/gitaly/service/remote/server.go
@@ -1,12 +1,12 @@
package remote
import (
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go
index e3a0b406f..233dd9bdf 100644
--- a/internal/gitaly/service/repository/replicate.go
+++ b/internal/gitaly/service/repository/replicate.go
@@ -11,7 +11,6 @@ import (
"strings"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/command"
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
@@ -21,6 +20,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/safe"
diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go
index 761fe6d3b..9b677d7fd 100644
--- a/internal/gitaly/service/repository/replicate_test.go
+++ b/internal/gitaly/service/repository/replicate_test.go
@@ -14,7 +14,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
@@ -26,6 +25,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/text"
@@ -851,7 +851,7 @@ func TestFetchInternalRemote_successful(t *testing.T) {
getGitalySSHInvocationParams := listenGitalySSHCalls(t, localCfg)
connsPool := client.NewPool()
- defer connsPool.Close()
+ defer testhelper.MustClose(t, connsPool)
// Use the `assert` package such that we can get information about why hooks have failed via
// the hook logs in case it did fail unexpectedly.
@@ -895,7 +895,7 @@ func TestFetchInternalRemote_failure(t *testing.T) {
ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
connsPool := client.NewPool()
- defer connsPool.Close()
+ defer testhelper.MustClose(t, connsPool)
err := fetchInternalRemote(ctx, &transaction.MockManager{}, connsPool, repo, &gitalypb.Repository{
StorageName: repoProto.GetStorageName(),
diff --git a/internal/gitaly/service/repository/server.go b/internal/gitaly/service/repository/server.go
index 0e354b2d1..5a2df1fed 100644
--- a/internal/gitaly/service/repository/server.go
+++ b/internal/gitaly/service/repository/server.go
@@ -3,7 +3,6 @@ package repository
import (
"context"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
@@ -14,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/unarycache"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
diff --git a/internal/gitaly/service/repository/server_test.go b/internal/gitaly/service/repository/server_test.go
index 26778d181..8cbcb1296 100644
--- a/internal/gitaly/service/repository/server_test.go
+++ b/internal/gitaly/service/repository/server_test.go
@@ -4,8 +4,8 @@ import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"google.golang.org/grpc/metadata"
)
@@ -13,7 +13,7 @@ import (
func TestGetConnectionByStorage(t *testing.T) {
t.Parallel()
connPool := client.NewPool()
- defer connPool.Close()
+ defer testhelper.MustClose(t, connPool)
s := server{conns: connPool}
ctx := testhelper.Context(t)
diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go
index 16152d16c..6ce6e05e2 100644
--- a/internal/gitaly/transaction/manager.go
+++ b/internal/gitaly/transaction/manager.go
@@ -9,10 +9,9 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/transaction/txinfo"
"gitlab.com/gitlab-org/gitaly/v16/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
@@ -67,10 +66,10 @@ type PoolManager struct {
func NewManager(cfg config.Cfg, backchannels *backchannel.Registry) *PoolManager {
return &PoolManager{
backchannels: backchannels,
- conns: client.NewPoolWithOptions(client.WithDialOptions(append(
+ conns: client.NewPool(client.WithDialOptions(append(
client.FailOnNonTempDialError(),
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor())...,
+ client.UnaryInterceptor(),
+ client.StreamInterceptor())...,
)),
votingDelayMetric: prometheus.NewHistogram(
prometheus.HistogramOpts{
diff --git a/internal/grpc/client/dial.go b/internal/grpc/client/dial.go
index 213605f15..a4f076c3a 100644
--- a/internal/grpc/client/dial.go
+++ b/internal/grpc/client/dial.go
@@ -16,6 +16,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/encoding/protojson"
)
@@ -79,7 +80,7 @@ func WithHandshaker(handshaker Handshaker) DialOption {
// `grpc.DialContext()`.
func WithGrpcOptions(opts []grpc.DialOption) DialOption {
return func(cfg *dialConfig) {
- cfg.grpcOpts = opts
+ cfg.grpcOpts = append(cfg.grpcOpts, opts...)
}
}
@@ -248,3 +249,29 @@ func defaultServiceConfig() string {
return string(configJSON)
}
+
+// FailOnNonTempDialError helps to identify if remote listener is ready to accept new connections.
+func FailOnNonTempDialError() []grpc.DialOption {
+ return []grpc.DialOption{
+ grpc.WithBlock(),
+ grpc.FailOnNonTempDialError(true),
+ }
+}
+
+// HealthCheckDialer uses provided dialer as an actual dialer, but issues a health check request to the remote
+// to verify the connection was set properly and could be used with no issues.
+func HealthCheckDialer(base Dialer) Dialer {
+ return func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) {
+ cc, err := base(ctx, address, dialOptions)
+ if err != nil {
+ return nil, err
+ }
+
+ if _, err := healthpb.NewHealthClient(cc).Check(ctx, &healthpb.HealthCheckRequest{}); err != nil {
+ _ = cc.Close()
+ return nil, err
+ }
+
+ return cc, nil
+ }
+}
diff --git a/internal/grpc/client/pool.go b/internal/grpc/client/pool.go
new file mode 100644
index 000000000..a79112dc8
--- /dev/null
+++ b/internal/grpc/client/pool.go
@@ -0,0 +1,128 @@
+package client
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
+ "google.golang.org/grpc"
+)
+
+type poolConfig struct {
+ dialer Dialer
+ dialOptions []grpc.DialOption
+}
+
+// PoolOption is an option that can be passed to NewPool.
+type PoolOption func(*poolConfig)
+
+// Dialer is a dialer used to create new connections.
+type Dialer func(ctx context.Context, address string, opts []grpc.DialOption) (*grpc.ClientConn, error)
+
+// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes.
+func WithDialer(dialer Dialer) PoolOption {
+ return func(options *poolConfig) {
+ options.dialer = dialer
+ }
+}
+
+// WithDialOptions sets gRPC options to use for the gRPC Dial call.
+func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption {
+ return func(options *poolConfig) {
+ options.dialOptions = dialOptions
+ }
+}
+
+type poolKey struct{ address, token string }
+
+// Pool is a pool of GRPC connections. Connections created by it are safe for
+// concurrent use.
+type Pool struct {
+ lock sync.RWMutex
+ conns map[poolKey]*grpc.ClientConn
+ dialer Dialer
+ dialOptions []grpc.DialOption
+}
+
+// NewPool creates a new connection pool that's ready for use.
+func NewPool(opts ...PoolOption) *Pool {
+ cfg := poolConfig{
+ dialer: func(ctx context.Context, address string, opts []grpc.DialOption) (*grpc.ClientConn, error) {
+ return Dial(ctx, address, WithGrpcOptions(opts))
+ },
+ }
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ return &Pool{
+ conns: make(map[poolKey]*grpc.ClientConn),
+ dialer: cfg.dialer,
+ dialOptions: cfg.dialOptions,
+ }
+}
+
+// Close closes all connections tracked by the connection pool.
+func (p *Pool) Close() error {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ var firstError error
+ for addr, conn := range p.conns {
+ if err := conn.Close(); err != nil && firstError == nil {
+ firstError = err
+ }
+
+ delete(p.conns, addr)
+ }
+
+ return firstError
+}
+
+// Dial creates a new client connection in case no connection to the given
+// address exists already or returns an already established connection. The
+// returned address must not be `Close()`d.
+func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
+ return p.getOrCreateConnection(ctx, address, token)
+}
+
+func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
+ if address == "" {
+ return nil, errors.New("address is empty")
+ }
+
+ key := poolKey{address: address, token: token}
+
+ p.lock.RLock()
+ cc, ok := p.conns[key]
+ p.lock.RUnlock()
+
+ if ok {
+ return cc, nil
+ }
+
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ cc, ok = p.conns[key]
+ if ok {
+ return cc, nil
+ }
+
+ opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1)
+ opts = append(opts, p.dialOptions...)
+ if token != "" {
+ opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
+ }
+
+ cc, err := p.dialer(ctx, address, opts)
+ if err != nil {
+ return nil, fmt.Errorf("could not dial source: %w", err)
+ }
+
+ p.conns[key] = cc
+
+ return cc, nil
+}
diff --git a/internal/grpc/proxy/handler_ext_test.go b/internal/grpc/proxy/handler_ext_test.go
index bfa1236c5..e03945dc9 100644
--- a/internal/grpc/proxy/handler_ext_test.go
+++ b/internal/grpc/proxy/handler_ext_test.go
@@ -18,7 +18,7 @@ import (
"github.com/getsentry/sentry-go"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
@@ -511,7 +511,7 @@ func TestRegisterStreamHandlers(t *testing.T) {
go testhelper.MustServe(t, server, listener)
defer server.Stop()
- conn, err := client.Dial("tcp://"+listener.Addr().String(), []grpc.DialOption{grpc.WithBlock()})
+ conn, err := client.Dial(ctx, "tcp://"+listener.Addr().String(), client.WithGrpcOptions([]grpc.DialOption{grpc.WithBlock()}))
require.NoError(t, err)
defer conn.Close()
client := grpc_testing.NewTestServiceClient(conn)
diff --git a/internal/grpc/sidechannel/conn.go b/internal/grpc/sidechannel/conn.go
index f77bbcbe3..e76496ea9 100644
--- a/internal/grpc/sidechannel/conn.go
+++ b/internal/grpc/sidechannel/conn.go
@@ -1,12 +1,16 @@
package sidechannel
import (
+ "context"
"fmt"
"io"
"net"
+ "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/streamio"
+ "google.golang.org/grpc"
)
// ServerConn and ClientConn implement an asymmetric framing protocol to
@@ -179,3 +183,10 @@ func (cc *ClientConn) CloseWrite() error {
return nil
}
+
+// Dial configures the dialer to establish a Gitaly backchannel connection instead of a regular gRPC connection. It
+// also injects sr as a sidechannel registry, so that Gitaly can establish sidechannels back to the client.
+func Dial(ctx context.Context, registry *Registry, logger logrus.FieldLogger, rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
+ clientHandshaker := NewClientHandshaker(logger, registry)
+ return client.Dial(ctx, rawAddress, client.WithGrpcOptions(connOpts), client.WithHandshaker(clientHandshaker))
+}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index feee4a8c8..70837c5ad 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -16,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
@@ -24,6 +23,7 @@ import (
gconfig "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
gitaly_metadata "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
@@ -2158,9 +2158,9 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
gitalypb.RegisterOperationServiceServer(srv, operationServer)
}, testserver.WithDiskCache(&mockDiskCache{}), testserver.WithDisablePraefect())
- conn, err := client.DialContext(ctx, addr, []grpc.DialOption{
+ conn, err := client.Dial(ctx, addr, client.WithGrpcOptions([]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
- })
+ }))
require.NoError(t, err)
defer conn.Close()
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index d8905bf6f..0d29c752c 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -9,8 +9,8 @@ import (
"time"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/datastructure"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
@@ -458,6 +458,8 @@ func TestMgr_GetSyncedNode(t *testing.T) {
}
func TestNodeStatus_IsHealthy(t *testing.T) {
+ ctx := testhelper.Context(t)
+
checkNTimes := func(t *testing.T, ctx context.Context, ns *nodeStatus, n int) {
for i := 0; i < n; i++ {
_, err := ns.CheckHealth(ctx)
@@ -470,12 +472,11 @@ func TestNodeStatus_IsHealthy(t *testing.T) {
healthSrv := testhelper.NewServerWithHealth(t, socket)
- clientConn, err := client.Dial(address, nil)
+ clientConn, err := client.Dial(ctx, address)
require.NoError(t, err)
defer func() { require.NoError(t, clientConn.Close()) }()
node := config.Node{Storage: "gitaly-0", Address: address}
- ctx := testhelper.Context(t)
logger := testhelper.SharedLogger(t)
latencyHistMock := &promtest.MockHistogramVec{}
diff --git a/internal/praefect/nodes/ping.go b/internal/praefect/nodes/ping.go
index 79b730336..fb15cfdec 100644
--- a/internal/praefect/nodes/ping.go
+++ b/internal/praefect/nodes/ping.go
@@ -9,8 +9,7 @@ import (
"sync"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc"
@@ -73,15 +72,15 @@ func (p *Ping) Address() string {
func (p *Ping) dial(ctx context.Context) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithBlock(),
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
+ client.UnaryInterceptor(),
+ client.StreamInterceptor(),
}
if len(p.token) > 0 {
opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.token)))
}
- return client.DialContext(ctx, p.address, opts)
+ return client.Dial(ctx, p.address, client.WithGrpcOptions(opts))
}
func (p *Ping) healthCheck(ctx context.Context, cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) {
diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go
index 3098e8735..02f59bf6a 100644
--- a/internal/praefect/replicator_pg_test.go
+++ b/internal/praefect/replicator_pg_test.go
@@ -7,8 +7,8 @@ import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb"
@@ -36,7 +36,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
defer srv.Stop()
go testhelper.MustServe(t, srv, ln)
- targetCC, err := client.Dial(ln.Addr().Network()+":"+ln.Addr().String(), nil)
+ targetCC, err := client.Dial(ctx, ln.Addr().Network()+":"+ln.Addr().String())
require.NoError(t, err)
defer testhelper.MustClose(t, targetCC)
diff --git a/internal/praefect/service/checks.go b/internal/praefect/service/checks.go
index 1201d7177..bcec3354a 100644
--- a/internal/praefect/service/checks.go
+++ b/internal/praefect/service/checks.go
@@ -11,8 +11,7 @@ import (
migrate "github.com/rubenv/sql-migrate"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/env"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
@@ -263,14 +262,14 @@ func NewClockSyncCheck(clockDriftCheck func(ntpHost string, driftThreshold time.
g.Go(func() error {
opts := []grpc.DialOption{
grpc.WithBlock(),
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
+ client.UnaryInterceptor(),
+ client.StreamInterceptor(),
}
if len(node.Token) > 0 {
opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)))
}
- cc, err := client.DialContext(ctx, node.Address, opts)
+ cc, err := client.Dial(ctx, node.Address, client.WithGrpcOptions(opts))
if err != nil {
return fmt.Errorf("%s machine: %w", node.Address, err)
}
diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go
index 3ab759937..f3954667d 100644
--- a/internal/praefect/testserver.go
+++ b/internal/praefect/testserver.go
@@ -9,9 +9,9 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/client"
gitalycfgauth "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
@@ -110,7 +110,7 @@ func listenAvailPort(tb testing.TB) (net.Listener, int) {
return listener, listener.Addr().(*net.TCPAddr).Port
}
-func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
+func dialLocalPort(tb testing.TB, ctx context.Context, port int, backend bool) *grpc.ClientConn {
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()),
@@ -124,8 +124,9 @@ func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
}
cc, err := client.Dial(
+ ctx,
fmt.Sprintf("tcp://localhost:%d", port),
- opts,
+ client.WithGrpcOptions(opts),
)
require.NoError(tb, err)
@@ -275,7 +276,7 @@ func RunPraefectServer(
replMgrDone := startProcessBacklog(ctx, replmgr)
// dial client to praefect
- cc := dialLocalPort(tb, port, false)
+ cc := dialLocalPort(tb, ctx, port, false)
cleanup := func() {
cc.Close()
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 8cc44245c..04b2ccbdb 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
@@ -30,7 +29,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitlab"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
- internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
@@ -139,14 +138,14 @@ func (gs GitalyServer) Address() string {
func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken string) {
grpcOpts := []grpc.DialOption{
grpc.WithBlock(),
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
+ client.UnaryInterceptor(),
+ client.StreamInterceptor(),
}
if authToken != "" {
grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(authToken)))
}
- conn, err := client.DialContext(ctx, addr, grpcOpts)
+ conn, err := client.Dial(ctx, addr, client.WithGrpcOptions(grpcOpts))
require.NoError(tb, err)
defer testhelper.MustClose(tb, conn)
@@ -173,7 +172,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
}
deps := gsd.createDependencies(tb, cfg)
- tb.Cleanup(func() { gsd.conns.Close() })
+ tb.Cleanup(func() { testhelper.MustClose(tb, gsd.conns) })
serverFactory := server.NewGitalyServerFactory(
cfg,
@@ -280,7 +279,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
}
if gsd.conns == nil {
- gsd.conns = client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor())
+ gsd.conns = client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor()))
}
if gsd.locator == nil {
diff --git a/tools/test-boot/main.go b/tools/test-boot/main.go
index d27e53856..a9739b00b 100644
--- a/tools/test-boot/main.go
+++ b/tools/test-boot/main.go
@@ -14,7 +14,7 @@ import (
"time"
"github.com/urfave/cli/v2"
- "gitlab.com/gitlab-org/gitaly/v16/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"google.golang.org/grpc"
)
@@ -112,9 +112,9 @@ func spawnAndWait(ctx context.Context, gitalyBin, configPath, socketPath string)
for i := 0; i < 300; i++ {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
- conn, err := client.DialContext(ctx, "unix://"+socketPath, []grpc.DialOption{
+ conn, err := client.Dial(ctx, "unix://"+socketPath, client.WithGrpcOptions([]grpc.DialOption{
grpc.WithBlock(),
- })
+ }))
cancel()