Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMikhail Mazurskiy <mmazurskiy@gitlab.com>2023-02-09 06:29:59 +0300
committerMikhail Mazurskiy <mmazurskiy@gitlab.com>2023-02-11 07:04:21 +0300
commit9a920e3aa9c261814f563bcbcab861e4b2c2c513 (patch)
tree0c782505b5938955dba6c73b546c9692556f399e
parent856b61b972b9590e56b1cc0c4fae8de872d968a3 (diff)
Extract gRPC client into a separate Go module
-rw-r--r--client/address_parser.go (renamed from internal/gitaly/client/address_parser.go)0
-rw-r--r--client/address_parser_test.go (renamed from internal/gitaly/client/address_parser_test.go)0
-rw-r--r--client/dial.go190
-rw-r--r--client/go.mod3
-rw-r--r--client/internal/backoff/backoff.go (renamed from internal/backoff/backoff.go)0
-rw-r--r--client/internal/backoff/backoff_test.go (renamed from internal/backoff/backoff_test.go)0
-rw-r--r--client/internal/dnsresolver/builder.go (renamed from internal/dnsresolver/builder.go)2
-rw-r--r--client/internal/dnsresolver/noop.go (renamed from internal/dnsresolver/noop.go)0
-rw-r--r--client/internal/dnsresolver/resolver.go (renamed from internal/dnsresolver/resolver.go)2
-rw-r--r--client/internal/dnsresolver/target.go (renamed from internal/dnsresolver/target.go)0
-rw-r--r--client/internal/x509/common.go (renamed from internal/x509/common.go)0
-rw-r--r--client/internal/x509/pool.go (renamed from internal/x509/pool.go)0
-rw-r--r--client/internal/x509/pool_darwin.go (renamed from internal/x509/pool_darwin.go)0
-rw-r--r--client/pool.go3
-rw-r--r--client/rpccredentials.go (renamed from auth/rpccredentials.go)12
-rw-r--r--client/sidechannel.go100
-rw-r--r--cmd/gitaly-hooks/hooks.go3
-rw-r--r--cmd/gitaly-ssh/auth_test.go4
-rw-r--r--cmd/gitaly-ssh/main.go12
-rw-r--r--cmd/gitaly-ssh/receive_pack.go5
-rw-r--r--cmd/gitaly-ssh/upload_archive.go3
-rw-r--r--cmd/gitaly-ssh/upload_pack.go7
-rw-r--r--cmd/praefect/subcmd.go3
-rw-r--r--go.work7
-rw-r--r--internal/auth/README.md (renamed from auth/README.md)0
-rw-r--r--internal/auth/extract_test.go (renamed from auth/extract_test.go)0
-rw-r--r--internal/auth/token.go (renamed from auth/token.go)0
-rw-r--r--internal/client_tests/client_test.go (renamed from client/client_test.go)2
-rw-r--r--internal/client_tests/dial_test.go (renamed from client/dial_test.go)131
-rw-r--r--internal/client_tests/dnsresolver/builder_test.go (renamed from internal/dnsresolver/builder_test.go)9
-rw-r--r--internal/client_tests/dnsresolver/resolver_test.go (renamed from internal/dnsresolver/resolver_test.go)25
-rw-r--r--internal/client_tests/dnsresolver/target_test.go (renamed from internal/dnsresolver/target_test.go)11
-rw-r--r--internal/client_tests/dnsresolver/testhelper_test.go (renamed from internal/dnsresolver/testhelper_test.go)9
-rw-r--r--internal/client_tests/pool_test.go (renamed from client/pool_test.go)44
-rw-r--r--internal/git/command_options.go2
-rw-r--r--internal/git/gittest/repo.go3
-rw-r--r--internal/gitaly/client/dial.go192
-rw-r--r--internal/gitaly/client/dial_test.go68
-rw-r--r--internal/gitaly/client/receive_pack.go (renamed from client/receive_pack.go)0
-rw-r--r--internal/gitaly/client/upload_archive.go (renamed from client/upload_archive.go)0
-rw-r--r--internal/gitaly/client/upload_pack.go (renamed from client/upload_pack.go)5
-rw-r--r--internal/gitaly/server/auth/auth.go2
-rw-r--r--internal/gitaly/server/auth_test.go12
-rw-r--r--internal/gitaly/service/operations/testhelper_test.go2
-rw-r--r--internal/gitaly/service/repository/create_fork_test.go5
-rw-r--r--internal/gitaly/service/repository/testhelper_test.go5
-rw-r--r--internal/gitaly/service/server/info_test.go2
-rw-r--r--internal/gitaly/service/smarthttp/testhelper_test.go2
-rw-r--r--internal/gitaly/service/smarthttp/upload_pack_test.go2
-rw-r--r--internal/gitaly/service/ssh/upload_pack_test.go2
-rw-r--r--internal/middleware/metadatahandler/metadatahandler.go2
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/nodes/manager.go2
-rw-r--r--internal/praefect/nodes/ping.go3
-rw-r--r--internal/praefect/replicator_test.go2
-rw-r--r--internal/praefect/service/checks.go3
-rw-r--r--internal/sidechannel/sidechannel.go100
-rw-r--r--internal/testhelper/testserver/gitaly.go3
-rw-r--r--proto/go.mod16
-rw-r--r--proto/go.sum20
60 files changed, 527 insertions, 517 deletions
diff --git a/internal/gitaly/client/address_parser.go b/client/address_parser.go
index 1d0d560cb..1d0d560cb 100644
--- a/internal/gitaly/client/address_parser.go
+++ b/client/address_parser.go
diff --git a/internal/gitaly/client/address_parser_test.go b/client/address_parser_test.go
index 3069d7e96..3069d7e96 100644
--- a/internal/gitaly/client/address_parser_test.go
+++ b/client/address_parser_test.go
diff --git a/client/dial.go b/client/dial.go
index 56125e6da..a64c690be 100644
--- a/client/dial.go
+++ b/client/dial.go
@@ -2,16 +2,24 @@ package client
import (
"context"
+ "crypto/tls"
+ "fmt"
"math/rand"
+ "net"
+ "net/url"
"time"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v15/internal/backoff"
- "gitlab.com/gitlab-org/gitaly/v15/internal/dnsresolver"
- "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
- "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/backoff"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/dnsresolver"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/x509"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/protobuf/encoding/protojson"
)
// DefaultDialOpts hold the default DialOptions for connection to Gitaly over UNIX-socket
@@ -27,7 +35,7 @@ var DefaultDialOpts = []grpc.DialOption{}
// connOpts should not contain `grpc.WithInsecure` as DialContext determines whether it is needed or not from the
// scheme. `grpc.TransportCredentials` should not be provided either as those are handled internally as well.
func DialContext(ctx context.Context, rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
- return client.Dial(ctx, rawAddress, connOpts, nil)
+ return DialHandshaker(ctx, rawAddress, connOpts, nil)
}
// Dial calls DialContext with the provided arguments and context.Background. Refer to DialContext's documentation
@@ -36,13 +44,151 @@ func Dial(rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, erro
return DialContext(context.Background(), rawAddress, connOpts)
}
-// DialSidechannel configures the dialer to establish a Gitaly
-// backchannel connection instead of a regular gRPC connection. It also
-// injects sr as a sidechannel registry, so that Gitaly can establish
-// sidechannels back to the client.
-func DialSidechannel(ctx context.Context, rawAddress string, sr *SidechannelRegistry, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
- clientHandshaker := sidechannel.NewClientHandshaker(sr.logger, sr.registry)
- return client.Dial(ctx, rawAddress, connOpts, clientHandshaker)
+type connectionType int
+
+const (
+ invalidConnection connectionType = iota
+ tcpConnection
+ tlsConnection
+ unixConnection
+)
+
+func getConnectionType(rawAddress string) connectionType {
+ u, err := url.Parse(rawAddress)
+ if err != nil {
+ return invalidConnection
+ }
+
+ switch u.Scheme {
+ case "tls":
+ return tlsConnection
+ case "unix":
+ return unixConnection
+ case "tcp":
+ return tcpConnection
+ default:
+ return invalidConnection
+ }
+}
+
+// Handshaker is an interface that allows for wrapping the transport credentials
+// with a custom handshake.
+type Handshaker interface {
+ // ClientHandshake wraps the provided credentials and returns new credentials.
+ ClientHandshake(credentials.TransportCredentials) credentials.TransportCredentials
+}
+
+// DialHandshaker dials a Gitaly node serving at the given address.
+//
+// If handshaker is provided, it's passed the transport credentials which would be otherwise set. The transport credentials
+// returned by handshaker are then set instead.
+func DialHandshaker(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, handshaker Handshaker) (*grpc.ClientConn, error) {
+ connOpts = cloneOpts(connOpts) // copy to avoid potentially mutating the backing array of the passed slice
+
+ var canonicalAddress string
+ var err error
+ var transportCredentials credentials.TransportCredentials
+
+ switch getConnectionType(rawAddress) {
+ case invalidConnection:
+ return nil, fmt.Errorf("invalid connection string: %q", rawAddress)
+
+ case tlsConnection:
+ canonicalAddress, err = extractHostFromRemoteURL(rawAddress) // Ensure the form: "host:port" ...
+ if err != nil {
+ return nil, fmt.Errorf("failed to extract host for 'tls' connection: %w", err)
+ }
+
+ certPool, err := x509.SystemCertPool()
+ if err != nil {
+ return nil, fmt.Errorf("failed to get system certificat pool for 'tls' connection: %w", err)
+ }
+
+ transportCredentials = credentials.NewTLS(&tls.Config{
+ RootCAs: certPool,
+ MinVersion: tls.VersionTLS12,
+ })
+
+ case tcpConnection:
+ canonicalAddress, err = extractHostFromRemoteURL(rawAddress) // Ensure the form: "host:port" ...
+ if err != nil {
+ return nil, fmt.Errorf("failed to extract host for 'tcp' connection: %w", err)
+ }
+
+ case unixConnection:
+ canonicalAddress = rawAddress // This will be overridden by the custom dialer...
+ connOpts = append(
+ connOpts,
+ // Use a custom dialer to ensure that we don't experience
+ // issues in environments that have proxy configurations
+ // https://gitlab.com/gitlab-org/gitaly/merge_requests/1072#note_140408512
+ grpc.WithContextDialer(func(ctx context.Context, addr string) (conn net.Conn, err error) {
+ path, err := extractPathFromSocketURL(addr)
+ if err != nil {
+ return nil, fmt.Errorf("failed to extract host for 'unix' connection: %w", err)
+ }
+
+ d := net.Dialer{}
+ return d.DialContext(ctx, "unix", path)
+ }),
+ )
+ }
+
+ if handshaker != nil {
+ if transportCredentials == nil {
+ transportCredentials = insecure.NewCredentials()
+ }
+
+ transportCredentials = handshaker.ClientHandshake(transportCredentials)
+ }
+
+ if transportCredentials == nil {
+ connOpts = append(connOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ } else {
+ connOpts = append(connOpts, grpc.WithTransportCredentials(transportCredentials))
+ }
+
+ connOpts = append(connOpts,
+ // grpc.KeepaliveParams must be specified at least as large as what is allowed by the
+ // server-side grpc.KeepaliveEnforcementPolicy
+ grpc.WithKeepaliveParams(keepalive.ClientParameters{
+ Time: 20 * time.Second,
+ PermitWithoutStream: true,
+ }),
+ // grpc.WithDisableServiceConfig ignores the service config provided by resolvers
+ // when they resolve the target. gRPC provides this feature to inject service
+ // config from external sources (DNS TXT record, for example). Gitaly doesn't need
+ // this feature. When we implement a custom client-side load balancer, this feature
+ // can even break the balancer. So, we should better disable it.
+ // For more information, please visit
+ // - https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md
+ grpc.WithDisableServiceConfig(),
+ // grpc.WithDefaultServiceConfig sets the recommended client-side load balancing
+ // configuration to client dial. By default, gRPC clients don't support client-side load
+ // balancing. After the connection to a host is established for the first time, that
+ // client always sticks to that host. In all Gitaly clients, the connection is cached
+ // somehow, usually one connection per host. It means they always stick to the same
+ // host until the process restarts. This is not a problem in pure Gitaly environment.
+ // In a cluster with more than one Praefect node, this behavior may cause serious
+ // workload skew, especially after a fail-over event.
+ //
+ // This option configures the load balancing strategy to `round_robin`. This is a
+ // built-in strategy grpc-go provides. When combining with service discovery via DNS,
+ // a client can distribute its requests to all discovered nodes in a round-robin
+ // fashion. The client can detect the connectivity changes, such as a node goes
+ // down/up again. It evicts subsequent requests accordingly.
+ //
+ // For more information:
+ // https://gitlab.com/groups/gitlab-org/-/epics/8971#note_1207008162
+ grpc.WithDefaultServiceConfig(defaultServiceConfig()),
+ )
+
+ conn, err := grpc.DialContext(ctx, canonicalAddress, connOpts...)
+ if err != nil {
+ return nil, fmt.Errorf("failed to dial %q connection: %w", canonicalAddress, err)
+ }
+
+ return conn, nil
}
// FailOnNonTempDialError helps to identify if remote listener is ready to accept new connections.
@@ -91,3 +237,23 @@ func DefaultDNSResolverBuilderConfig() *DNSResolverBuilderConfig {
func WithGitalyDNSResolver(opts *DNSResolverBuilderConfig) grpc.DialOption {
return grpc.WithResolvers(dnsresolver.NewBuilder((*dnsresolver.BuilderConfig)(opts)))
}
+
+func cloneOpts(opts []grpc.DialOption) []grpc.DialOption {
+ clone := make([]grpc.DialOption, len(opts))
+ copy(clone, opts)
+ return clone
+}
+
+func defaultServiceConfig() string {
+ serviceConfig := &gitalypb.ServiceConfig{
+ LoadBalancingConfig: []*gitalypb.LoadBalancingConfig{{
+ Policy: &gitalypb.LoadBalancingConfig_RoundRobin{},
+ }},
+ }
+ configJSON, err := protojson.Marshal(serviceConfig)
+ if err != nil {
+ panic("fail to convert service config from protobuf to json")
+ }
+
+ return string(configJSON)
+}
diff --git a/client/go.mod b/client/go.mod
new file mode 100644
index 000000000..7d777651b
--- /dev/null
+++ b/client/go.mod
@@ -0,0 +1,3 @@
+module gitlab.com/gitlab-org/gitaly/v15/client
+
+go 1.18
diff --git a/internal/backoff/backoff.go b/client/internal/backoff/backoff.go
index f395c40b0..f395c40b0 100644
--- a/internal/backoff/backoff.go
+++ b/client/internal/backoff/backoff.go
diff --git a/internal/backoff/backoff_test.go b/client/internal/backoff/backoff_test.go
index b33b223b0..b33b223b0 100644
--- a/internal/backoff/backoff_test.go
+++ b/client/internal/backoff/backoff_test.go
diff --git a/internal/dnsresolver/builder.go b/client/internal/dnsresolver/builder.go
index 88a5bd763..1698f10eb 100644
--- a/internal/dnsresolver/builder.go
+++ b/client/internal/dnsresolver/builder.go
@@ -6,7 +6,7 @@ import (
"time"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v15/internal/backoff"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/backoff"
"gitlab.com/gitlab-org/gitaly/v15/internal/structerr"
"google.golang.org/grpc/resolver"
)
diff --git a/internal/dnsresolver/noop.go b/client/internal/dnsresolver/noop.go
index 6197653ed..6197653ed 100644
--- a/internal/dnsresolver/noop.go
+++ b/client/internal/dnsresolver/noop.go
diff --git a/internal/dnsresolver/resolver.go b/client/internal/dnsresolver/resolver.go
index 76be0dbe4..be2126c1b 100644
--- a/internal/dnsresolver/resolver.go
+++ b/client/internal/dnsresolver/resolver.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v15/internal/backoff"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/backoff"
"gitlab.com/gitlab-org/gitaly/v15/internal/structerr"
"google.golang.org/grpc/resolver"
)
diff --git a/internal/dnsresolver/target.go b/client/internal/dnsresolver/target.go
index 6975d0272..6975d0272 100644
--- a/internal/dnsresolver/target.go
+++ b/client/internal/dnsresolver/target.go
diff --git a/internal/x509/common.go b/client/internal/x509/common.go
index 8ca72d208..8ca72d208 100644
--- a/internal/x509/common.go
+++ b/client/internal/x509/common.go
diff --git a/internal/x509/pool.go b/client/internal/x509/pool.go
index 7030fb638..7030fb638 100644
--- a/internal/x509/pool.go
+++ b/client/internal/x509/pool.go
diff --git a/internal/x509/pool_darwin.go b/client/internal/x509/pool_darwin.go
index 851b16d72..851b16d72 100644
--- a/internal/x509/pool_darwin.go
+++ b/client/internal/x509/pool_darwin.go
diff --git a/client/pool.go b/client/pool.go
index 4fcd3835e..c56ba50eb 100644
--- a/client/pool.go
+++ b/client/pool.go
@@ -6,7 +6,6 @@ import (
"fmt"
"sync"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"google.golang.org/grpc"
)
@@ -89,7 +88,7 @@ func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string)
opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1)
opts = append(opts, p.dialOptions...)
if token != "" {
- opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
+ opts = append(opts, grpc.WithPerRPCCredentials(RPCCredentialsV2(token)))
}
cc, err := p.dialer(ctx, address, opts)
diff --git a/auth/rpccredentials.go b/client/rpccredentials.go
index 9ebf19d15..752724cad 100644
--- a/auth/rpccredentials.go
+++ b/client/rpccredentials.go
@@ -1,7 +1,9 @@
-package gitalyauth
+package client
import (
"context"
+ "crypto/hmac"
+ "crypto/sha256"
"fmt"
"strconv"
"time"
@@ -31,3 +33,11 @@ func (rc2 *rpcCredentialsV2) GetRequestMetadata(context.Context, ...string) (map
"authorization": "Bearer " + fmt.Sprintf("v2.%x.%s", signature, message),
}, nil
}
+
+func hmacSign(secret []byte, message string) []byte {
+ mac := hmac.New(sha256.New, secret)
+ // hash.Hash never returns an error.
+ _, _ = mac.Write([]byte(message))
+
+ return mac.Sum(nil)
+}
diff --git a/client/sidechannel.go b/client/sidechannel.go
deleted file mode 100644
index 100d32738..000000000
--- a/client/sidechannel.go
+++ /dev/null
@@ -1,100 +0,0 @@
-package client
-
-import (
- "context"
- "io"
-
- "github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
- "gitlab.com/gitlab-org/gitaly/v15/internal/listenmux"
- "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
-)
-
-// SidechannelRegistry associates sidechannel callbacks with outbound
-// gRPC calls.
-type SidechannelRegistry struct {
- registry *sidechannel.Registry
- logger *logrus.Entry
-}
-
-// NewSidechannelRegistry returns a new registry.
-func NewSidechannelRegistry(logger *logrus.Entry) *SidechannelRegistry {
- return &SidechannelRegistry{
- registry: sidechannel.NewRegistry(),
- logger: logger,
- }
-}
-
-// Register registers a callback. It adds metadata to ctx and returns the
-// new context. The caller must use the new context for the gRPC call.
-// Caller must Close() the returned SidechannelWaiter to prevent resource
-// leaks.
-func (sr *SidechannelRegistry) Register(
- ctx context.Context,
- callback func(SidechannelConn) error,
-) (context.Context, *SidechannelWaiter) {
- ctx, waiter := sidechannel.RegisterSidechannel(
- ctx,
- sr.registry,
- func(cc *sidechannel.ClientConn) error { return callback(cc) },
- )
- return ctx, &SidechannelWaiter{waiter: waiter}
-}
-
-// SidechannelWaiter represents a pending sidechannel and its callback.
-type SidechannelWaiter struct{ waiter *sidechannel.Waiter }
-
-// Close de-registers the sidechannel callback. If the callback is still
-// running, Close blocks until it is done and returns the error return
-// value of the callback. If the callback has not been called yet, Close
-// returns an error immediately.
-func (w *SidechannelWaiter) Close() error { return w.waiter.Close() }
-
-// SidechannelConn allows a client to read and write bytes with less
-// overhead than doing so via gRPC messages.
-type SidechannelConn interface {
- io.ReadWriter
-
- // CloseWrite tells the server we won't write any more data. We can still
- // read data from the server after CloseWrite(). A typical use case is in
- // an RPC where the byte stream has a request/response pattern: the
- // client then uses CloseWrite() to signal the end of the request body.
- // When the client calls CloseWrite(), the server receives EOF.
- CloseWrite() error
-}
-
-// TestSidechannelServer allows downstream consumers of this package to
-// create mock sidechannel gRPC servers.
-func TestSidechannelServer(
- logger *logrus.Entry,
- creds credentials.TransportCredentials,
- handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error,
-) []grpc.ServerOption {
- return []grpc.ServerOption{
- SidechannelServer(logger, creds),
- grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
- conn, err := OpenServerSidechannel(stream.Context())
- if err != nil {
- return err
- }
- defer conn.Close()
-
- return handler(srv, stream, conn)
- }),
- }
-}
-
-// SidechannelServer adds sidechannel support to a gRPC server
-func SidechannelServer(logger *logrus.Entry, creds credentials.TransportCredentials) grpc.ServerOption {
- lm := listenmux.New(creds)
- lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))
- return grpc.Creds(lm)
-}
-
-// OpenServerSidechannel opens a sidechannel on the server side. This
-// only works if the server was created using SidechannelServer().
-func OpenServerSidechannel(ctx context.Context) (io.ReadWriteCloser, error) {
- return sidechannel.OpenSidechannel(ctx)
-}
diff --git a/cmd/gitaly-hooks/hooks.go b/cmd/gitaly-hooks/hooks.go
index 11601d29a..49c4a84c5 100644
--- a/cmd/gitaly-hooks/hooks.go
+++ b/cmd/gitaly-hooks/hooks.go
@@ -9,7 +9,6 @@ import (
"os"
"path/filepath"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
@@ -148,7 +147,7 @@ func noopSender(c chan error) {}
func dialGitaly(payload git.HooksPayload) (*grpc.ClientConn, error) {
dialOpts := client.DefaultDialOpts
if payload.InternalSocketToken != "" {
- dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(payload.InternalSocketToken)))
+ dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(client.RPCCredentialsV2(payload.InternalSocketToken)))
}
conn, err := client.Dial("unix://"+payload.InternalSocket, dialOpts)
diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go
index 54149bcf6..b9240de8b 100644
--- a/cmd/gitaly-ssh/auth_test.go
+++ b/cmd/gitaly-ssh/auth_test.go
@@ -16,7 +16,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver"
- "gitlab.com/gitlab-org/gitaly/v15/internal/x509"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/protobuf/encoding/protojson"
)
@@ -93,7 +92,8 @@ func TestConnectivity(t *testing.T) {
name: "tls",
addr: func(t *testing.T, cfg config.Cfg) (string, string) {
certFile, keyFile := testhelper.GenerateCerts(t)
- t.Setenv(x509.SSLCertFile, certFile)
+ // Read by "gitlab.com/gitlab-org/gitaly/v15/client/internal/x509"
+ t.Setenv("SSL_CERT_FILE", certFile)
cfg.TLSListenAddr = "localhost:0"
cfg.TLS = config.TLS{
diff --git a/cmd/gitaly-ssh/main.go b/cmd/gitaly-ssh/main.go
index 7284ca6c0..6d6d87cce 100644
--- a/cmd/gitaly-ssh/main.go
+++ b/cmd/gitaly-ssh/main.go
@@ -9,15 +9,15 @@ import (
"strings"
"github.com/sirupsen/logrus"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
"gitlab.com/gitlab-org/labkit/tracing"
"google.golang.org/grpc"
)
-type packFn func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error)
+type packFn func(context.Context, *grpc.ClientConn, *sidechannel.SidechannelRegistry, string) (int32, error)
type gitalySSHCommand struct {
// The git packer that shall be executed. One of receivePack,
@@ -111,7 +111,7 @@ func (cmd gitalySSHCommand) run() (int, error) {
}
}
- registry := client.NewSidechannelRegistry(logrus.NewEntry(logrus.StandardLogger()))
+ registry := sidechannel.NewSidechannelRegistry(logrus.NewEntry(logrus.StandardLogger()))
conn, err := getConnection(ctx, cmd.address, registry)
if err != nil {
return 1, err
@@ -126,13 +126,13 @@ func (cmd gitalySSHCommand) run() (int, error) {
return int(code), nil
}
-func getConnection(ctx context.Context, url string, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) {
+func getConnection(ctx context.Context, url string, registry *sidechannel.SidechannelRegistry) (*grpc.ClientConn, error) {
if url == "" {
return nil, fmt.Errorf("gitaly address can not be empty")
}
if useSidechannel() {
- return client.DialSidechannel(ctx, url, registry, dialOpts())
+ return sidechannel.Dial(ctx, url, registry, dialOpts())
}
return client.DialContext(ctx, url, dialOpts())
@@ -141,7 +141,7 @@ func getConnection(ctx context.Context, url string, registry *client.Sidechannel
func dialOpts() []grpc.DialOption {
connOpts := client.DefaultDialOpts
if token := os.Getenv("GITALY_TOKEN"); token != "" {
- connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
+ connOpts = append(connOpts, grpc.WithPerRPCCredentials(client.RPCCredentialsV2(token)))
}
return append(connOpts, internalclient.UnaryInterceptor(), internalclient.StreamInterceptor())
diff --git a/cmd/gitaly-ssh/receive_pack.go b/cmd/gitaly-ssh/receive_pack.go
index 0c8ed00dd..de1acdf85 100644
--- a/cmd/gitaly-ssh/receive_pack.go
+++ b/cmd/gitaly-ssh/receive_pack.go
@@ -5,13 +5,14 @@ import (
"fmt"
"os"
- "gitlab.com/gitlab-org/gitaly/v15/client"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
)
-func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
+func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.SidechannelRegistry, req string) (int32, error) {
var request gitalypb.SSHReceivePackRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
diff --git a/cmd/gitaly-ssh/upload_archive.go b/cmd/gitaly-ssh/upload_archive.go
index eb1eea6de..7efebe830 100644
--- a/cmd/gitaly-ssh/upload_archive.go
+++ b/cmd/gitaly-ssh/upload_archive.go
@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
+ client2 "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"os"
"gitlab.com/gitlab-org/gitaly/v15/client"
@@ -20,5 +21,5 @@ func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *client.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- return client.UploadArchive(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
+ return client2.UploadArchive(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
}
diff --git a/cmd/gitaly-ssh/upload_pack.go b/cmd/gitaly-ssh/upload_pack.go
index 2901d490a..faa6daf22 100644
--- a/cmd/gitaly-ssh/upload_pack.go
+++ b/cmd/gitaly-ssh/upload_pack.go
@@ -5,7 +5,8 @@ import (
"fmt"
"os"
- "gitlab.com/gitlab-org/gitaly/v15/client"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
@@ -22,7 +23,7 @@ func uploadPackConfig(config []string) []string {
return append([]string{GitConfigShowAllRefs}, config...)
}
-func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
+func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.SidechannelRegistry, req string) (int32, error) {
var request gitalypb.SSHUploadPackRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
return 0, fmt.Errorf("json unmarshal: %w", err)
@@ -36,7 +37,7 @@ func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *client.Sid
return client.UploadPack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
}
-func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
+func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, registry *sidechannel.SidechannelRegistry, req string) (int32, error) {
var request gitalypb.SSHUploadPackWithSidechannelRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
return 0, fmt.Errorf("json unmarshal: %w", err)
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 0ac247f8f..1804abfa7 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -11,7 +11,6 @@ import (
"time"
"github.com/sirupsen/logrus"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
@@ -133,7 +132,7 @@ func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration,
if len(token) > 0 {
opts = append(opts,
grpc.WithPerRPCCredentials(
- gitalyauth.RPCCredentialsV2(token),
+ client.RPCCredentialsV2(token),
),
)
}
diff --git a/go.work b/go.work
new file mode 100644
index 000000000..67c3839b0
--- /dev/null
+++ b/go.work
@@ -0,0 +1,7 @@
+go 1.18
+
+use (
+ .
+ ./client
+ ./proto
+)
diff --git a/auth/README.md b/internal/auth/README.md
index b4c065ac2..b4c065ac2 100644
--- a/auth/README.md
+++ b/internal/auth/README.md
diff --git a/auth/extract_test.go b/internal/auth/extract_test.go
index 4d136fe10..4d136fe10 100644
--- a/auth/extract_test.go
+++ b/internal/auth/extract_test.go
diff --git a/auth/token.go b/internal/auth/token.go
index 7a32b8c78..7a32b8c78 100644
--- a/auth/token.go
+++ b/internal/auth/token.go
diff --git a/client/client_test.go b/internal/client_tests/client_test.go
index 8cd8c2c02..d0528b921 100644
--- a/client/client_test.go
+++ b/internal/client_tests/client_test.go
@@ -1,4 +1,4 @@
-package client
+package client_tests
import (
"testing"
diff --git a/client/dial_test.go b/internal/client_tests/dial_test.go
index bc05b1b75..ee724afcf 100644
--- a/client/dial_test.go
+++ b/internal/client_tests/dial_test.go
@@ -1,9 +1,10 @@
-package client
+package client_tests
import (
"context"
"crypto/tls"
"fmt"
+ client2 "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"io"
"net"
"os"
@@ -17,10 +18,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
- internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/v15/client"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/listenmux"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
- gitalyx509 "gitlab.com/gitlab-org/gitaly/v15/internal/x509"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
@@ -34,6 +36,55 @@ import (
"google.golang.org/grpc/test/grpc_testing"
)
+func TestDialHandshaker(t *testing.T) {
+ errNonMuxed := status.Error(codes.Internal, "non-muxed connection")
+ errMuxed := status.Error(codes.Internal, "muxed connection")
+
+ logger := testhelper.NewDiscardingLogEntry(t)
+
+ lm := listenmux.New(insecure.NewCredentials())
+ lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))
+
+ srv := grpc.NewServer(
+ grpc.Creds(lm),
+ grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
+ _, err := backchannel.GetPeerID(stream.Context())
+ if err == backchannel.ErrNonMultiplexedConnection {
+ return errNonMuxed
+ }
+
+ assert.NoError(t, err)
+ return errMuxed
+ }),
+ )
+ defer srv.Stop()
+
+ ln, err := net.Listen("tcp", "localhost:0")
+ require.NoError(t, err)
+
+ go testhelper.MustServe(t, srv, ln)
+ ctx := testhelper.Context(t)
+
+ t.Run("non-muxed conn", func(t *testing.T) {
+ nonMuxedConn, err := client.DialHandshaker(ctx, "tcp://"+ln.Addr().String(), nil, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, nonMuxedConn.Close()) }()
+
+ dialErr := nonMuxedConn.Invoke(ctx, "/Service/Method", &gitalypb.VoteTransactionRequest{}, &gitalypb.VoteTransactionResponse{})
+ testhelper.RequireGrpcError(t, errNonMuxed, dialErr)
+ })
+
+ t.Run("muxed conn", func(t *testing.T) {
+ handshaker := backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }, backchannel.DefaultConfiguration())
+ nonMuxedConn, err := client.DialHandshaker(ctx, "tcp://"+ln.Addr().String(), nil, handshaker)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, nonMuxedConn.Close()) }()
+
+ dialErr := nonMuxedConn.Invoke(ctx, "/Service/Method", &gitalypb.VoteTransactionRequest{}, &gitalypb.VoteTransactionResponse{})
+ testhelper.RequireGrpcError(t, errMuxed, dialErr)
+ })
+}
+
var proxyEnvironmentKeys = []string{"http_proxy", "https_proxy", "no_proxy"}
func TestDial(t *testing.T) {
@@ -119,7 +170,7 @@ func TestDial(t *testing.T) {
{
name: "dial fail if there is no listener on address",
rawAddress: "tcp://invalid.address",
- dialOpts: FailOnNonTempDialError(),
+ dialOpts: client.FailOnNonTempDialError(),
expectDialFailure: true,
},
}
@@ -131,13 +182,14 @@ func TestDial(t *testing.T) {
}
if tt.envSSLCertFile != "" {
- t.Setenv(gitalyx509.SSLCertFile, tt.envSSLCertFile)
+ // Read in gitlab.com/gitlab-org/gitaly/v15/client/internal/x509.
+ t.Setenv("SSL_CERT_FILE", tt.envSSLCertFile)
}
ctx := testhelper.Context(t)
- dialOpts := append(tt.dialOpts, WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()))
- conn, err := Dial(tt.rawAddress, dialOpts)
+ dialOpts := append(tt.dialOpts, client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()))
+ conn, err := client.Dial(tt.rawAddress, dialOpts)
if tt.expectDialFailure {
require.Error(t, err)
return
@@ -161,7 +213,7 @@ func TestDialSidechannel(t *testing.T) {
}
stop, connectionMap := startListeners(t, func(creds credentials.TransportCredentials) *grpc.Server {
- return grpc.NewServer(TestSidechannelServer(newLogger(t), creds, func(
+ return grpc.NewServer(sidechannel.TestSidechannelServer(newLogger(t), creds, func(
_ interface{},
stream grpc.ServerStream,
sidechannelConn io.ReadWriteCloser,
@@ -195,7 +247,7 @@ func TestDialSidechannel(t *testing.T) {
unixSocketPath := filepath.Join(tempDir, "gitaly.socket")
require.NoError(t, os.Symlink(unixSocketAbsPath, unixSocketPath))
- registry := NewSidechannelRegistry(newLogger(t))
+ registry := sidechannel.NewSidechannelRegistry(newLogger(t))
tests := []struct {
name string
@@ -221,17 +273,18 @@ func TestDialSidechannel(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.envSSLCertFile != "" {
- t.Setenv(gitalyx509.SSLCertFile, tt.envSSLCertFile)
+ // Read in gitlab.com/gitlab-org/gitaly/v15/client/internal/x509.
+ t.Setenv("SSL_CERT_FILE", tt.envSSLCertFile)
}
ctx := testhelper.Context(t)
- dialOpts := append(tt.dialOpts, WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()))
- conn, err := DialSidechannel(ctx, tt.rawAddress, registry, dialOpts)
+ dialOpts := append(tt.dialOpts, client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()))
+ conn, err := sidechannel.Dial(ctx, tt.rawAddress, registry, dialOpts)
require.NoError(t, err)
defer testhelper.MustClose(t, conn)
- ctx, scw := registry.Register(ctx, func(conn SidechannelConn) error {
+ ctx, scw := registry.Register(ctx, func(conn sidechannel.SidechannelConn) error {
const message = "hello world"
if _, err := io.WriteString(conn, message); err != nil {
return err
@@ -303,10 +356,10 @@ func TestDial_Correlation(t *testing.T) {
defer grpcServer.Stop()
ctx := testhelper.Context(t)
- cc, err := DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
- WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()),
+ cc, err := client.DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{
+ client2.UnaryInterceptor(),
+ client2.StreamInterceptor(),
+ client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()),
})
require.NoError(t, err)
defer testhelper.MustClose(t, cc)
@@ -340,10 +393,10 @@ func TestDial_Correlation(t *testing.T) {
defer grpcServer.Stop()
ctx := testhelper.Context(t)
- cc, err := DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
- WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()),
+ cc, err := client.DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{
+ client2.UnaryInterceptor(),
+ client2.StreamInterceptor(),
+ client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()),
})
require.NoError(t, err)
defer testhelper.MustClose(t, cc)
@@ -415,10 +468,10 @@ func TestDial_Tracing(t *testing.T) {
// This needs to be run after setting up the global tracer as it will cause us to
// create the span when executing the RPC call further down below.
- cc, err := DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
- WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()),
+ cc, err := client.DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{
+ client2.UnaryInterceptor(),
+ client2.StreamInterceptor(),
+ client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()),
})
require.NoError(t, err)
defer testhelper.MustClose(t, cc)
@@ -476,10 +529,10 @@ func TestDial_Tracing(t *testing.T) {
// This needs to be run after setting up the global tracer as it will cause us to
// create the span when executing the RPC call further down below.
- cc, err := DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
- WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()),
+ cc, err := client.DialContext(ctx, "unix://"+serverSocketPath, []grpc.DialOption{
+ client2.UnaryInterceptor(),
+ client2.StreamInterceptor(),
+ client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()),
})
require.NoError(t, err)
defer testhelper.MustClose(t, cc)
@@ -643,14 +696,14 @@ func TestHealthCheckDialer(t *testing.T) {
defer cleanup()
ctx := testhelper.Context(t)
- _, err := HealthCheckDialer(DialContext)(ctx, addr, nil)
+ _, err := client.HealthCheckDialer(client.DialContext)(ctx, addr, nil)
testhelper.RequireGrpcError(t, status.Error(codes.Unauthenticated, "authentication required"), err)
- cc, err := HealthCheckDialer(DialContext)(ctx, addr, []grpc.DialOption{
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("token")),
- internalclient.UnaryInterceptor(),
- internalclient.StreamInterceptor(),
- WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()),
+ cc, err := client.HealthCheckDialer(client.DialContext)(ctx, addr, []grpc.DialOption{
+ grpc.WithPerRPCCredentials(client.RPCCredentialsV2("token")),
+ client2.UnaryInterceptor(),
+ client2.StreamInterceptor(),
+ client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()),
})
require.NoError(t, err)
require.NoError(t, cc.Close())
@@ -738,7 +791,7 @@ func verifyDNSConnection(t *testing.T, dial func(*testing.T, string, []grpc.Dial
target,
[]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
- WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()),
+ client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()),
},
)
require.NoError(t, err)
@@ -770,7 +823,7 @@ func TestWithGitalyDNSResolver_zeroAddresses(t *testing.T) {
target,
[]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
- WithGitalyDNSResolver(DefaultDNSResolverBuilderConfig()),
+ client.WithGitalyDNSResolver(client.DefaultDNSResolverBuilderConfig()),
},
)
require.NoError(t, err)
@@ -795,7 +848,7 @@ func startFakeGitalyServer(t *testing.T) string {
listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
- srv := grpc.NewServer(SidechannelServer(newLogger(t), insecure.NewCredentials()))
+ srv := grpc.NewServer(sidechannel.SidechannelServer(newLogger(t), insecure.NewCredentials()))
gitalypb.RegisterCommitServiceServer(srv, &fakeCommitServer{})
go testhelper.MustServe(t, srv, listener)
t.Cleanup(srv.Stop)
diff --git a/internal/dnsresolver/builder_test.go b/internal/client_tests/dnsresolver/builder_test.go
index 8942cccb1..f8ce28498 100644
--- a/internal/dnsresolver/builder_test.go
+++ b/internal/client_tests/dnsresolver/builder_test.go
@@ -7,6 +7,7 @@ import (
"github.com/miekg/dns"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/dnsresolver"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"google.golang.org/grpc/resolver"
)
@@ -68,7 +69,7 @@ func TestBuildDNSBuilder_customAuthorityResolver(t *testing.T) {
return nil
}).Start()
- builder := NewBuilder(&BuilderConfig{
+ builder := dnsresolver.NewBuilder(&dnsresolver.BuilderConfig{
RefreshRate: 0,
Logger: testhelper.NewDiscardingLogger(t),
Backoff: &fakeBackoff{},
@@ -114,7 +115,7 @@ func TestBuildDNSBuilder_staticIPAddress(t *testing.T) {
return nil
}).Start()
- builder := NewBuilder(&BuilderConfig{
+ builder := dnsresolver.NewBuilder(&dnsresolver.BuilderConfig{
RefreshRate: 0,
Logger: testhelper.NewDiscardingLogger(t),
Backoff: &fakeBackoff{},
@@ -130,7 +131,7 @@ func TestBuildDNSBuilder_staticIPAddress(t *testing.T) {
Addr: tc.addr,
}}}}, conn.states)
- require.IsType(t, &noopResolver{}, r, "building a resolver for IP address should return a no-op resolver")
+ require.IsType(t, &dnsresolver.noopResolver{}, r, "building a resolver for IP address should return a no-op resolver")
})
}
}
@@ -138,6 +139,6 @@ func TestBuildDNSBuilder_staticIPAddress(t *testing.T) {
func TestSchemeDNSBuilder(t *testing.T) {
t.Parallel()
- d := &Builder{}
+ d := &dnsresolver.Builder{}
require.Equal(t, d.Scheme(), "dns")
}
diff --git a/internal/dnsresolver/resolver_test.go b/internal/client_tests/dnsresolver/resolver_test.go
index b955cc17d..71fb2cfaf 100644
--- a/internal/dnsresolver/resolver_test.go
+++ b/internal/client_tests/dnsresolver/resolver_test.go
@@ -3,6 +3,7 @@ package dnsresolver
import (
"context"
"fmt"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/dnsresolver"
"math/rand"
"net"
"testing"
@@ -10,7 +11,7 @@ import (
"github.com/miekg/dns"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v15/internal/backoff"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/backoff"
"gitlab.com/gitlab-org/gitaly/v15/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
@@ -33,7 +34,7 @@ func TestDnsResolver(t *testing.T) {
}{
{
name: "resolver updates a single IPv4 each resolution",
- setup: func(server *testhelper.FakeDNSServer, _ *Builder) *fakeClientConn {
+ setup: func(server *testhelper.FakeDNSServer, _ *dnsresolver.Builder) *fakeClientConn {
ips := ipList{ips: [][]string{
{"1.2.3.4"},
{"1.2.3.5"},
@@ -57,7 +58,7 @@ func TestDnsResolver(t *testing.T) {
},
{
name: "resolver updates multiple IPv4 each resolution",
- setup: func(server *testhelper.FakeDNSServer, _ *Builder) *fakeClientConn {
+ setup: func(server *testhelper.FakeDNSServer, _ *dnsresolver.Builder) *fakeClientConn {
ips := ipList{ips: [][]string{
{"1.2.3.4", "1.2.3.5"},
{"1.2.3.6"},
@@ -81,7 +82,7 @@ func TestDnsResolver(t *testing.T) {
},
{
name: "resolver updates multiple IPv6 each resolution",
- setup: func(server *testhelper.FakeDNSServer, _ *Builder) *fakeClientConn {
+ setup: func(server *testhelper.FakeDNSServer, _ *dnsresolver.Builder) *fakeClientConn {
ips := ipList{ips: [][]string{
{"::1", "::2"},
{"::3", "::4"},
@@ -105,7 +106,7 @@ func TestDnsResolver(t *testing.T) {
},
{
name: "resolver resolves address without port",
- setup: func(server *testhelper.FakeDNSServer, _ *Builder) *fakeClientConn {
+ setup: func(server *testhelper.FakeDNSServer, _ *dnsresolver.Builder) *fakeClientConn {
ips := ipList{ips: [][]string{
{"1.2.3.4"},
{"1.2.3.5"},
@@ -129,7 +130,7 @@ func TestDnsResolver(t *testing.T) {
},
{
name: "resolver retries with exponential Backoff when client connection fails to update",
- setup: func(server *testhelper.FakeDNSServer, _ *Builder) *fakeClientConn {
+ setup: func(server *testhelper.FakeDNSServer, _ *dnsresolver.Builder) *fakeClientConn {
conn := newFakeClientConn(2, 0)
connErr := 2
conn.customUpdateState = func(state resolver.State) error {
@@ -163,7 +164,7 @@ func TestDnsResolver(t *testing.T) {
},
{
name: "DNS nameserver returns empty addresses",
- setup: func(server *testhelper.FakeDNSServer, _ *Builder) *fakeClientConn {
+ setup: func(server *testhelper.FakeDNSServer, _ *dnsresolver.Builder) *fakeClientConn {
server.WithHandler(dns.TypeA, func(_ string) []string {
return nil
})
@@ -178,7 +179,7 @@ func TestDnsResolver(t *testing.T) {
},
{
name: "DNS nameserver raises timeout error",
- setup: func(server *testhelper.FakeDNSServer, builder *Builder) *fakeClientConn {
+ setup: func(server *testhelper.FakeDNSServer, builder *dnsresolver.Builder) *fakeClientConn {
ips := ipList{ips: [][]string{
{"1.2.3.4"},
{},
@@ -186,7 +187,7 @@ func TestDnsResolver(t *testing.T) {
{"1.2.3.6"},
}}
- builder.opts.authorityFinder = func(authority string) (dnsLookuper, error) {
+ builder.opts.authorityFinder = func(authority string) (dnsresolver.dnsLookuper, error) {
f := newFakeLookup(t, authority)
f.stubLookup = func(ctx context.Context, host string) ([]string, error) {
nextIPs := ips.peek()
@@ -226,7 +227,7 @@ func TestDnsResolver(t *testing.T) {
},
{
name: "DNS nameserver raises a temporary error",
- setup: func(server *testhelper.FakeDNSServer, builder *Builder) *fakeClientConn {
+ setup: func(server *testhelper.FakeDNSServer, builder *dnsresolver.Builder) *fakeClientConn {
ips := ipList{ips: [][]string{
{"1.2.3.4"},
{},
@@ -238,7 +239,7 @@ func TestDnsResolver(t *testing.T) {
{"1.2.3.6"},
}}
- builder.opts.authorityFinder = func(authority string) (dnsLookuper, error) {
+ builder.opts.authorityFinder = func(authority string) (dnsresolver.dnsLookuper, error) {
f := newFakeLookup(t, authority)
f.stubLookup = func(ctx context.Context, host string) ([]string, error) {
nextIPs := ips.peek()
@@ -341,7 +342,7 @@ func TestDnsResolver_grpcCallWithOurDNSResolver(t *testing.T) {
conn, err := grpc.Dial(
target.URL.String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithResolvers(NewBuilder(&BuilderConfig{
+ grpc.WithResolvers(dnsresolver.NewBuilder(&dnsresolver.BuilderConfig{
RefreshRate: 0, // No delay
Logger: testhelper.NewDiscardingLogger(t),
DefaultGrpcPort: "1234",
diff --git a/internal/dnsresolver/target_test.go b/internal/client_tests/dnsresolver/target_test.go
index e4333d535..164d9e798 100644
--- a/internal/dnsresolver/target_test.go
+++ b/internal/client_tests/dnsresolver/target_test.go
@@ -2,6 +2,7 @@ package dnsresolver
import (
"fmt"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/dnsresolver"
"net"
"testing"
@@ -14,7 +15,7 @@ import (
func TestFindDNSLookup_default(t *testing.T) {
t.Parallel()
- resolver, err := findDNSLookup("")
+ resolver, err := dnsresolver.findDNSLookup("")
require.NoError(t, err)
require.Equal(t, net.DefaultResolver, resolver)
}
@@ -22,7 +23,7 @@ func TestFindDNSLookup_default(t *testing.T) {
func TestFindDNSLookup_invalidAuthority(t *testing.T) {
t.Parallel()
- resolver, err := findDNSLookup("this:is:not:good")
+ resolver, err := dnsresolver.findDNSLookup("this:is:not:good")
require.ErrorIs(t, structerr.New("dns resolver: %w", &net.AddrError{
Err: "too many colons in address",
Addr: "this:is:not:good:53",
@@ -40,7 +41,7 @@ func TestFindDNSLookup_validAuthority(t *testing.T) {
return nil
}).Start()
- resolver, err := findDNSLookup(fakeServer.Addr())
+ resolver, err := dnsresolver.findDNSLookup(fakeServer.Addr())
require.NoError(t, err)
addrs, err := resolver.LookupHost(testhelper.Context(t), "grpc.test")
@@ -154,7 +155,7 @@ func TestParseTarget(t *testing.T) {
}
for _, tc := range tests {
t.Run(fmt.Sprintf("target: %s", tc.target), func(t *testing.T) {
- host, port, err := parseTarget(tc.target, defaultPort)
+ host, port, err := dnsresolver.parseTarget(tc.target, defaultPort)
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedHost, host)
@@ -180,7 +181,7 @@ func TestTryParseIP(t *testing.T) {
}
for _, tc := range tests {
t.Run(fmt.Sprintf("host: %s, port: %s", tc.host, tc.port), func(t *testing.T) {
- addr, ok := tryParseIP(tc.host, tc.port)
+ addr, ok := dnsresolver.tryParseIP(tc.host, tc.port)
require.Equal(t, tc.expectedOk, ok)
require.Equal(t, tc.expectedAddr, addr)
diff --git a/internal/dnsresolver/testhelper_test.go b/internal/client_tests/dnsresolver/testhelper_test.go
index 819967837..6f993a4b5 100644
--- a/internal/dnsresolver/testhelper_test.go
+++ b/internal/client_tests/dnsresolver/testhelper_test.go
@@ -3,6 +3,7 @@ package dnsresolver
import (
"context"
"fmt"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/dnsresolver"
"net/url"
"sync"
"testing"
@@ -98,7 +99,7 @@ func (c *fakeBackoff) Backoff(uint) time.Duration {
// fakeLookup stubs the DNS lookup. It wraps around a real DNS lookup. The caller can return an
// alternative addresses, errors, or fallback to use the real DNS lookup if needed.
type fakeLookup struct {
- realLookup dnsLookuper
+ realLookup dnsresolver.dnsLookuper
stubLookup func(context.Context, string) ([]string, error)
}
@@ -107,7 +108,7 @@ func (f *fakeLookup) LookupHost(ctx context.Context, s string) ([]string, error)
}
func newFakeLookup(t *testing.T, authority string) *fakeLookup {
- lookup, err := findDNSLookup(authority)
+ lookup, err := dnsresolver.findDNSLookup(authority)
require.NoError(t, err)
return &fakeLookup{realLookup: lookup}
@@ -121,8 +122,8 @@ func buildResolverTarget(s *testhelper.FakeDNSServer, addr string) resolver.Targ
}}
}
-func newTestDNSBuilder(t *testing.T) *Builder {
- return NewBuilder(&BuilderConfig{
+func newTestDNSBuilder(t *testing.T) *dnsresolver.Builder {
+ return dnsresolver.NewBuilder(&dnsresolver.BuilderConfig{
RefreshRate: 0,
Logger: testhelper.NewDiscardingLogger(t),
Backoff: &fakeBackoff{},
diff --git a/client/pool_test.go b/internal/client_tests/pool_test.go
index 37b6b9920..dd57369af 100644
--- a/client/pool_test.go
+++ b/internal/client_tests/pool_test.go
@@ -1,4 +1,4 @@
-package client
+package client_tests
import (
"context"
@@ -9,7 +9,7 @@ import (
grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
+ "gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap/starter"
gitalycfgauth "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/server/auth"
@@ -33,12 +33,12 @@ func TestPoolDial(t *testing.T) {
testCases := []struct {
desc string
- poolOptions []PoolOption
- test func(t *testing.T, ctx context.Context, pool *Pool)
+ poolOptions []client.PoolOption
+ test func(t *testing.T, ctx context.Context, pool *client.Pool)
}{
{
desc: "dialing once succeeds",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
conn, err := pool.Dial(ctx, insecure, "")
require.NoError(t, err)
verifyConnection(t, ctx, conn, codes.OK)
@@ -46,7 +46,7 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dialing multiple times succeeds",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
for i := 0; i < 10; i++ {
conn, err := pool.Dial(ctx, insecure, "")
require.NoError(t, err)
@@ -56,7 +56,7 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "redialing after close succeeds",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
conn, err := pool.Dial(ctx, insecure, "")
require.NoError(t, err)
verifyConnection(t, ctx, conn, codes.OK)
@@ -70,7 +70,7 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dialing invalid fails",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
conn, err := pool.Dial(ctx, "foo/bar", "")
require.Error(t, err)
require.Nil(t, conn)
@@ -78,7 +78,7 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dialing empty fails",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
conn, err := pool.Dial(ctx, "", "")
require.Error(t, err)
require.Nil(t, conn)
@@ -86,7 +86,7 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dialing concurrently succeeds",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
@@ -105,7 +105,7 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dialing with credentials succeeds",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
conn, err := pool.Dial(ctx, secure, creds)
require.NoError(t, err)
verifyConnection(t, ctx, conn, codes.OK)
@@ -113,7 +113,7 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dialing with invalid credentials fails",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
conn, err := pool.Dial(ctx, secure, "invalid-credential")
require.NoError(t, err)
verifyConnection(t, ctx, conn, codes.PermissionDenied)
@@ -121,7 +121,7 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dialing with missing credentials fails",
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
conn, err := pool.Dial(ctx, secure, "")
require.NoError(t, err)
verifyConnection(t, ctx, conn, codes.Unauthenticated)
@@ -129,10 +129,10 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dialing with dial options succeeds",
- poolOptions: []PoolOption{
- WithDialOptions(grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(creds))),
+ poolOptions: []client.PoolOption{
+ client.WithDialOptions(grpc.WithPerRPCCredentials(client.RPCCredentialsV2(creds))),
},
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
conn, err := pool.Dial(ctx, secure, "") // no creds here
require.NoError(t, err)
verifyConnection(t, ctx, conn, codes.OK) // auth passes
@@ -140,13 +140,13 @@ func TestPoolDial(t *testing.T) {
},
{
desc: "dial options function is invoked per dial",
- poolOptions: []PoolOption{
- WithDialer(func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) {
+ poolOptions: []client.PoolOption{
+ client.WithDialer(func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) {
dialFuncInvocationCounter++
- return DialContext(ctx, address, dialOptions)
+ return client.DialContext(ctx, address, dialOptions)
}),
},
- test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ test: func(t *testing.T, ctx context.Context, pool *client.Pool) {
_, err := pool.Dial(ctx, secure, "")
require.NoError(t, err)
assert.Equal(t, 1, dialFuncInvocationCounter)
@@ -159,7 +159,7 @@ func TestPoolDial(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- pool := NewPoolWithOptions(tc.poolOptions...)
+ pool := client.NewPoolWithOptions(tc.poolOptions...)
defer func() {
require.NoError(t, pool.Close())
}()
@@ -230,7 +230,7 @@ func TestPool_Dial_same_addr_another_token(t *testing.T) {
_, addr, stop1 := runServer(t, "")
defer func() { stop1() }()
- pool := NewPool()
+ pool := client.NewPool()
defer pool.Close()
// all good - server is running and serving requests
diff --git a/internal/git/command_options.go b/internal/git/command_options.go
index 7b1c3ddb2..118585c97 100644
--- a/internal/git/command_options.go
+++ b/internal/git/command_options.go
@@ -3,6 +3,7 @@ package git
import (
"context"
"fmt"
+ "gitlab.com/gitlab-org/gitaly/v15/client/internal/x509"
"io"
"os"
"regexp"
@@ -13,7 +14,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/structerr"
- "gitlab.com/gitlab-org/gitaly/v15/internal/x509"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/protobuf/encoding/protojson"
diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go
index 208d189f8..90fdc94d8 100644
--- a/internal/git/gittest/repo.go
+++ b/internal/git/gittest/repo.go
@@ -11,7 +11,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/repository"
@@ -100,7 +99,7 @@ type CreateRepositoryConfig struct {
func dialService(tb testing.TB, ctx context.Context, cfg config.Cfg) *grpc.ClientConn {
dialOptions := []grpc.DialOption{internalclient.UnaryInterceptor(), internalclient.StreamInterceptor()}
if cfg.Auth.Token != "" {
- dialOptions = append(dialOptions, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token)))
+ dialOptions = append(dialOptions, grpc.WithPerRPCCredentials(client.RPCCredentialsV2(cfg.Auth.Token)))
}
conn, err := client.DialContext(ctx, cfg.SocketPath, dialOptions)
diff --git a/internal/gitaly/client/dial.go b/internal/gitaly/client/dial.go
index 1858e7310..6497abff4 100644
--- a/internal/gitaly/client/dial.go
+++ b/internal/gitaly/client/dial.go
@@ -1,183 +1,11 @@
package client
import (
- "context"
- "crypto/tls"
- "fmt"
- "net"
- "net/url"
- "time"
-
- "gitlab.com/gitlab-org/gitaly/v15/internal/dnsresolver"
- gitalyx509 "gitlab.com/gitlab-org/gitaly/v15/internal/x509"
- "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
"google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/protobuf/encoding/protojson"
-)
-
-type connectionType int
-
-const (
- invalidConnection connectionType = iota
- tcpConnection
- tlsConnection
- unixConnection
- dnsConnection
)
-func getConnectionType(rawAddress string) connectionType {
- u, err := url.Parse(rawAddress)
- if err != nil {
- return invalidConnection
- }
-
- switch u.Scheme {
- case "tls":
- return tlsConnection
- case "unix":
- return unixConnection
- case "tcp":
- return tcpConnection
- case "dns":
- return dnsConnection
- default:
- return invalidConnection
- }
-}
-
-// Handshaker is an interface that allows for wrapping the transport credentials
-// with a custom handshake.
-type Handshaker interface {
- // ClientHandshake wraps the provided credentials and returns new credentials.
- ClientHandshake(credentials.TransportCredentials) credentials.TransportCredentials
-}
-
-// Dial dials a Gitaly node serving at the given address. Dial is used by the public 'client' package
-// and the expected behavior is mostly documented there.
-//
-// If handshaker is provided, it's passed the transport credentials which would be otherwise set. The transport credentials
-// returned by handshaker are then set instead.
-func Dial(ctx context.Context, rawAddress string, connOpts []grpc.DialOption, handshaker Handshaker) (*grpc.ClientConn, error) {
- connOpts = cloneOpts(connOpts) // copy to avoid potentially mutating the backing array of the passed slice
-
- var canonicalAddress string
- var err error
- var transportCredentials credentials.TransportCredentials
-
- switch getConnectionType(rawAddress) {
- case invalidConnection:
- return nil, fmt.Errorf("invalid connection string: %q", rawAddress)
-
- case tlsConnection:
- canonicalAddress, err = extractHostFromRemoteURL(rawAddress) // Ensure the form: "host:port" ...
- if err != nil {
- return nil, fmt.Errorf("failed to extract host for 'tls' connection: %w", err)
- }
-
- certPool, err := gitalyx509.SystemCertPool()
- if err != nil {
- return nil, fmt.Errorf("failed to get system certificat pool for 'tls' connection: %w", err)
- }
-
- transportCredentials = credentials.NewTLS(&tls.Config{
- RootCAs: certPool,
- MinVersion: tls.VersionTLS12,
- })
-
- case tcpConnection:
- canonicalAddress, err = extractHostFromRemoteURL(rawAddress) // Ensure the form: "host:port" ...
- if err != nil {
- return nil, fmt.Errorf("failed to extract host for 'tcp' connection: %w", err)
- }
-
- case dnsConnection:
- err = dnsresolver.ValidateURL(rawAddress)
- if err != nil {
- return nil, fmt.Errorf("failed to parse target for 'dns' connection: %w", err)
- }
- canonicalAddress = rawAddress // DNS Resolver will handle this
-
- case unixConnection:
- canonicalAddress = rawAddress // This will be overridden by the custom dialer...
- connOpts = append(
- connOpts,
- // Use a custom dialer to ensure that we don't experience
- // issues in environments that have proxy configurations
- // https://gitlab.com/gitlab-org/gitaly/merge_requests/1072#note_140408512
- grpc.WithContextDialer(func(ctx context.Context, addr string) (conn net.Conn, err error) {
- path, err := extractPathFromSocketURL(addr)
- if err != nil {
- return nil, fmt.Errorf("failed to extract host for 'unix' connection: %w", err)
- }
-
- d := net.Dialer{}
- return d.DialContext(ctx, "unix", path)
- }),
- )
- }
-
- if handshaker != nil {
- if transportCredentials == nil {
- transportCredentials = insecure.NewCredentials()
- }
-
- transportCredentials = handshaker.ClientHandshake(transportCredentials)
- }
-
- if transportCredentials == nil {
- connOpts = append(connOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
- } else {
- connOpts = append(connOpts, grpc.WithTransportCredentials(transportCredentials))
- }
-
- connOpts = append(connOpts,
- // grpc.KeepaliveParams must be specified at least as large as what is allowed by the
- // server-side grpc.KeepaliveEnforcementPolicy
- grpc.WithKeepaliveParams(keepalive.ClientParameters{
- Time: 20 * time.Second,
- PermitWithoutStream: true,
- }),
- // grpc.WithDisableServiceConfig ignores the service config provided by resolvers
- // when they resolve the target. gRPC provides this feature to inject service
- // config from external sources (DNS TXT record, for example). Gitaly doesn't need
- // this feature. When we implement a custom client-side load balancer, this feature
- // can even break the balancer. So, we should better disable it.
- // For more information, please visit
- // - https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md
- grpc.WithDisableServiceConfig(),
- // grpc.WithDefaultServiceConfig sets the recommended client-side load balancing
- // configuration to client dial. By default, gRPC clients don't support client-side load
- // balancing. After the connection to a host is established for the first time, that
- // client always sticks to that host. In all Gitaly clients, the connection is cached
- // somehow, usually one connection per host. It means they always stick to the same
- // host until the process restarts. This is not a problem in pure Gitaly environment.
- // In a cluster with more than one Praefect node, this behavior may cause serious
- // workload skew, especially after a fail-over event.
- //
- // This option configures the load balancing strategy to `round_robin`. This is a
- // built-in strategy grpc-go provides. When combining with service discovery via DNS,
- // a client can distribute its requests to all discovered nodes in a round-robin
- // fashion. The client can detect the connectivity changes, such as a node goes
- // down/up again. It evicts subsequent requests accordingly.
- //
- // For more information:
- // https://gitlab.com/groups/gitlab-org/-/epics/8971#note_1207008162
- grpc.WithDefaultServiceConfig(defaultServiceConfig()),
- )
-
- conn, err := grpc.DialContext(ctx, canonicalAddress, connOpts...)
- if err != nil {
- return nil, fmt.Errorf("failed to dial %q connection: %w", canonicalAddress, err)
- }
-
- return conn, nil
-}
-
// StreamInterceptor returns the stream interceptors that should be configured for a client.
func StreamInterceptor() grpc.DialOption {
return grpc.WithChainStreamInterceptor(
@@ -193,23 +21,3 @@ func UnaryInterceptor() grpc.DialOption {
grpccorrelation.UnaryClientCorrelationInterceptor(),
)
}
-
-func cloneOpts(opts []grpc.DialOption) []grpc.DialOption {
- clone := make([]grpc.DialOption, len(opts))
- copy(clone, opts)
- return clone
-}
-
-func defaultServiceConfig() string {
- serviceConfig := &gitalypb.ServiceConfig{
- LoadBalancingConfig: []*gitalypb.LoadBalancingConfig{{
- Policy: &gitalypb.LoadBalancingConfig_RoundRobin{},
- }},
- }
- configJSON, err := protojson.Marshal(serviceConfig)
- if err != nil {
- panic("fail to convert service config from protobuf to json")
- }
-
- return string(configJSON)
-}
diff --git a/internal/gitaly/client/dial_test.go b/internal/gitaly/client/dial_test.go
deleted file mode 100644
index c7c97a10f..000000000
--- a/internal/gitaly/client/dial_test.go
+++ /dev/null
@@ -1,68 +0,0 @@
-//go:build !gitaly_test_sha256
-
-package client
-
-import (
- "net"
- "testing"
-
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
- "gitlab.com/gitlab-org/gitaly/v15/internal/listenmux"
- "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/status"
-)
-
-func TestDial(t *testing.T) {
- errNonMuxed := status.Error(codes.Internal, "non-muxed connection")
- errMuxed := status.Error(codes.Internal, "muxed connection")
-
- logger := testhelper.NewDiscardingLogEntry(t)
-
- lm := listenmux.New(insecure.NewCredentials())
- lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))
-
- srv := grpc.NewServer(
- grpc.Creds(lm),
- grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
- _, err := backchannel.GetPeerID(stream.Context())
- if err == backchannel.ErrNonMultiplexedConnection {
- return errNonMuxed
- }
-
- assert.NoError(t, err)
- return errMuxed
- }),
- )
- defer srv.Stop()
-
- ln, err := net.Listen("tcp", "localhost:0")
- require.NoError(t, err)
-
- go testhelper.MustServe(t, srv, ln)
- ctx := testhelper.Context(t)
-
- t.Run("non-muxed conn", func(t *testing.T) {
- nonMuxedConn, err := Dial(ctx, "tcp://"+ln.Addr().String(), nil, nil)
- require.NoError(t, err)
- defer func() { require.NoError(t, nonMuxedConn.Close()) }()
-
- dialErr := nonMuxedConn.Invoke(ctx, "/Service/Method", &gitalypb.VoteTransactionRequest{}, &gitalypb.VoteTransactionResponse{})
- testhelper.RequireGrpcError(t, errNonMuxed, dialErr)
- })
-
- t.Run("muxed conn", func(t *testing.T) {
- handshaker := backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }, backchannel.DefaultConfiguration())
- nonMuxedConn, err := Dial(ctx, "tcp://"+ln.Addr().String(), nil, handshaker)
- require.NoError(t, err)
- defer func() { require.NoError(t, nonMuxedConn.Close()) }()
-
- dialErr := nonMuxedConn.Invoke(ctx, "/Service/Method", &gitalypb.VoteTransactionRequest{}, &gitalypb.VoteTransactionResponse{})
- testhelper.RequireGrpcError(t, errMuxed, dialErr)
- })
-}
diff --git a/client/receive_pack.go b/internal/gitaly/client/receive_pack.go
index 7d081d4c0..7d081d4c0 100644
--- a/client/receive_pack.go
+++ b/internal/gitaly/client/receive_pack.go
diff --git a/client/upload_archive.go b/internal/gitaly/client/upload_archive.go
index 559eba93a..559eba93a 100644
--- a/client/upload_archive.go
+++ b/internal/gitaly/client/upload_archive.go
diff --git a/client/upload_pack.go b/internal/gitaly/client/upload_pack.go
index aeda65f6b..c6643d44c 100644
--- a/client/upload_pack.go
+++ b/internal/gitaly/client/upload_pack.go
@@ -4,6 +4,7 @@ import (
"context"
"io"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v15/internal/stream"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v15/streamio"
@@ -46,7 +47,7 @@ func UploadPack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, std
func UploadPackWithSidechannel(
ctx context.Context,
conn *grpc.ClientConn,
- reg *SidechannelRegistry,
+ reg *sidechannel.SidechannelRegistry,
stdin io.Reader,
stdout, stderr io.Writer,
req *gitalypb.SSHUploadPackWithSidechannelRequest,
@@ -54,7 +55,7 @@ func UploadPackWithSidechannel(
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- ctx, wt := reg.Register(ctx, func(c SidechannelConn) error {
+ ctx, wt := reg.Register(ctx, func(c sidechannel.SidechannelConn) error {
return stream.ProxyPktLine(c, stdin, stdout, stderr)
})
defer func() {
diff --git a/internal/gitaly/server/auth/auth.go b/internal/gitaly/server/auth/auth.go
index 29c043dcb..22068c7a1 100644
--- a/internal/gitaly/server/auth/auth.go
+++ b/internal/gitaly/server/auth/auth.go
@@ -2,12 +2,12 @@ package auth
import (
"context"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/auth"
"time"
grpcmwauth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
gitalycfgauth "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 2c008077e..b3cede04b 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -7,6 +7,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/auth"
"io"
"net"
"testing"
@@ -14,7 +15,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v15/internal/cache"
@@ -94,7 +94,7 @@ func TestAuthFailures(t *testing.T) {
},
{
desc: "wrong secret",
- opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("foobar"))},
+ opts: []grpc.DialOption{grpc.WithPerRPCCredentials(client.RPCCredentialsV2("foobar"))},
code: codes.PermissionDenied,
},
}
@@ -126,17 +126,17 @@ func TestAuthSuccess(t *testing.T) {
{desc: "no auth, not required"},
{
desc: "v2 correct auth, not required",
- opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))},
+ opts: []grpc.DialOption{grpc.WithPerRPCCredentials(client.RPCCredentialsV2(token))},
token: token,
},
{
desc: "v2 incorrect auth, not required",
- opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("incorrect"))},
+ opts: []grpc.DialOption{grpc.WithPerRPCCredentials(client.RPCCredentialsV2("incorrect"))},
token: token,
},
{
desc: "v2 correct auth, required",
- opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))},
+ opts: []grpc.DialOption{grpc.WithPerRPCCredentials(client.RPCCredentialsV2(token))},
token: token,
required: true,
},
@@ -180,7 +180,7 @@ func newOperationClient(t *testing.T, token, serverSocketPath string) (gitalypb.
connOpts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)),
+ grpc.WithPerRPCCredentials(client.RPCCredentialsV2(token)),
}
conn, err := grpc.Dial(serverSocketPath, connOpts...)
require.NoError(t, err)
diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go
index 336d8c2c6..6ddcdce7c 100644
--- a/internal/gitaly/service/operations/testhelper_test.go
+++ b/internal/gitaly/service/operations/testhelper_test.go
@@ -4,10 +4,10 @@ package operations
import (
"context"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v15/client"
"testing"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
diff --git a/internal/gitaly/service/repository/create_fork_test.go b/internal/gitaly/service/repository/create_fork_test.go
index cb936e263..2804c813d 100644
--- a/internal/gitaly/service/repository/create_fork_test.go
+++ b/internal/gitaly/service/repository/create_fork_test.go
@@ -7,13 +7,13 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
+ gitalyx509 "gitlab.com/gitlab-org/gitaly/v15/client/internal/x509"
"os"
"path/filepath"
"strings"
"testing"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
@@ -23,7 +23,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
- gitalyx509 "gitlab.com/gitlab-org/gitaly/v15/internal/x509"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -337,7 +336,7 @@ func newSecureRepoClient(tb testing.TB, addr, token string, pool *x509.CertPool)
RootCAs: pool,
MinVersion: tls.VersionTLS12,
})),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)),
+ grpc.WithPerRPCCredentials(client.RPCCredentialsV2(token)),
}
conn, err := client.Dial(addr, connOpts)
diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go
index a4085ab02..c05c3251b 100644
--- a/internal/gitaly/service/repository/testhelper_test.go
+++ b/internal/gitaly/service/repository/testhelper_test.go
@@ -9,7 +9,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
gclient "gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/stats"
@@ -41,7 +40,7 @@ func newRepositoryClient(tb testing.TB, cfg config.Cfg, serverSocketPath string)
internalclient.UnaryInterceptor(), internalclient.StreamInterceptor(),
}
if cfg.Auth.Token != "" {
- connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token)))
+ connOpts = append(connOpts, grpc.WithPerRPCCredentials(gclient.RPCCredentialsV2(cfg.Auth.Token)))
}
conn, err := gclient.Dial(serverSocketPath, connOpts)
require.NoError(tb, err)
@@ -60,7 +59,7 @@ func newObjectPoolClient(tb testing.TB, cfg config.Cfg, serverSocketPath string)
func newMuxedRepositoryClient(t *testing.T, ctx context.Context, cfg config.Cfg, serverSocketPath string, handshaker internalclient.Handshaker) gitalypb.RepositoryServiceClient {
conn, err := internalclient.Dial(ctx, serverSocketPath, []grpc.DialOption{
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cfg.Auth.Token)),
+ grpc.WithPerRPCCredentials(gclient.RPCCredentialsV2(cfg.Auth.Token)),
}, handshaker)
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })
diff --git a/internal/gitaly/service/server/info_test.go b/internal/gitaly/service/server/info_test.go
index 8f31fdb25..a220ac54d 100644
--- a/internal/gitaly/service/server/info_test.go
+++ b/internal/gitaly/service/server/info_test.go
@@ -3,10 +3,10 @@
package server
import (
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v15/client"
"testing"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth"
diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go
index f3a731af9..16df592ef 100644
--- a/internal/gitaly/service/smarthttp/testhelper_test.go
+++ b/internal/gitaly/service/smarthttp/testhelper_test.go
@@ -2,10 +2,10 @@ package smarthttp
import (
"context"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v15/client"
"testing"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
diff --git a/internal/gitaly/service/smarthttp/upload_pack_test.go b/internal/gitaly/service/smarthttp/upload_pack_test.go
index a53da2294..c37e48cf4 100644
--- a/internal/gitaly/service/smarthttp/upload_pack_test.go
+++ b/internal/gitaly/service/smarthttp/upload_pack_test.go
@@ -6,6 +6,7 @@ import (
"encoding/binary"
"errors"
"fmt"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v15/client"
"io"
"path/filepath"
"sync"
@@ -14,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline"
diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go
index 76aea6226..a9043fca9 100644
--- a/internal/gitaly/service/ssh/upload_pack_test.go
+++ b/internal/gitaly/service/ssh/upload_pack_test.go
@@ -6,6 +6,7 @@ import (
"bytes"
"context"
"fmt"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v15/client"
"io"
"os"
"os/exec"
@@ -17,7 +18,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
diff --git a/internal/middleware/metadatahandler/metadatahandler.go b/internal/middleware/metadatahandler/metadatahandler.go
index 4ce002a04..da5c77e69 100644
--- a/internal/middleware/metadatahandler/metadatahandler.go
+++ b/internal/middleware/metadatahandler/metadatahandler.go
@@ -2,13 +2,13 @@ package metadatahandler
import (
"context"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/auth"
"strings"
grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/structerr"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc"
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index ceecf6713..fe52bfc56 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -4,12 +4,12 @@ package praefect
import (
"context"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v15/client"
"net"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index a64bf8571..1492fdfc4 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -5,13 +5,13 @@ import (
"database/sql"
"errors"
"fmt"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v15/client"
"math/rand"
"sync"
"time"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/sirupsen/logrus"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
diff --git a/internal/praefect/nodes/ping.go b/internal/praefect/nodes/ping.go
index 8b1b67540..3264a4719 100644
--- a/internal/praefect/nodes/ping.go
+++ b/internal/praefect/nodes/ping.go
@@ -8,7 +8,6 @@ import (
"strings"
"sync"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
@@ -78,7 +77,7 @@ func (p *Ping) dial(ctx context.Context) (*grpc.ClientConn, error) {
}
if len(p.token) > 0 {
- opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.token)))
+ opts = append(opts, grpc.WithPerRPCCredentials(client.RPCCredentialsV2(p.token)))
}
return client.DialContext(ctx, p.address, opts)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 16604c057..f869359fb 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -4,6 +4,7 @@ package praefect
import (
"context"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v15/client"
"path/filepath"
"strings"
"sync"
@@ -16,7 +17,6 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/housekeeping"
diff --git a/internal/praefect/service/checks.go b/internal/praefect/service/checks.go
index 37f51256c..d6b6a9212 100644
--- a/internal/praefect/service/checks.go
+++ b/internal/praefect/service/checks.go
@@ -10,7 +10,6 @@ import (
"time"
migrate "github.com/rubenv/sql-migrate"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
internalclient "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
@@ -267,7 +266,7 @@ func NewClockSyncCheck(clockDriftCheck func(ntpHost string, driftThreshold time.
internalclient.StreamInterceptor(),
}
if len(node.Token) > 0 {
- opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)))
+ opts = append(opts, grpc.WithPerRPCCredentials(client.RPCCredentialsV2(node.Token)))
}
cc, err := client.DialContext(ctx, node.Address, opts)
diff --git a/internal/sidechannel/sidechannel.go b/internal/sidechannel/sidechannel.go
index fcd8d5258..c52cc718d 100644
--- a/internal/sidechannel/sidechannel.go
+++ b/internal/sidechannel/sidechannel.go
@@ -10,8 +10,8 @@ import (
"time"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
- "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/listenmux"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@@ -132,7 +132,7 @@ func NewServerHandshaker(registry *Registry) *ServerHandshaker {
// NewClientHandshaker is used to enable sidechannel support on outbound
// gRPC connections.
-func NewClientHandshaker(logger *logrus.Entry, registry *Registry) client.Handshaker {
+func NewClientHandshaker(logger *logrus.Entry, registry *Registry) backchannel.ClientHandshaker {
cfg := backchannel.DefaultConfiguration()
// If a client hangs up while the server is writing data to it then the
// server will block for 5 minutes by default before erroring out. This
@@ -150,3 +150,99 @@ func NewClientHandshaker(logger *logrus.Entry, registry *Registry) client.Handsh
cfg,
)
}
+
+// SidechannelRegistry associates sidechannel callbacks with outbound
+// gRPC calls.
+type SidechannelRegistry struct {
+ registry *Registry
+ logger *logrus.Entry
+}
+
+// NewSidechannelRegistry returns a new registry.
+func NewSidechannelRegistry(logger *logrus.Entry) *SidechannelRegistry {
+ return &SidechannelRegistry{
+ registry: NewRegistry(),
+ logger: logger,
+ }
+}
+
+// Register registers a callback. It adds metadata to ctx and returns the
+// new context. The caller must use the new context for the gRPC call.
+// Caller must Close() the returned SidechannelWaiter to prevent resource
+// leaks.
+func (sr *SidechannelRegistry) Register(
+ ctx context.Context,
+ callback func(SidechannelConn) error,
+) (context.Context, *SidechannelWaiter) {
+ ctx, waiter := RegisterSidechannel(
+ ctx,
+ sr.registry,
+ func(cc *ClientConn) error { return callback(cc) },
+ )
+ return ctx, &SidechannelWaiter{waiter: waiter}
+}
+
+// SidechannelWaiter represents a pending sidechannel and its callback.
+type SidechannelWaiter struct{ waiter *Waiter }
+
+// Close de-registers the sidechannel callback. If the callback is still
+// running, Close blocks until it is done and returns the error return
+// value of the callback. If the callback has not been called yet, Close
+// returns an error immediately.
+func (w *SidechannelWaiter) Close() error { return w.waiter.Close() }
+
+// SidechannelConn allows a client to read and write bytes with less
+// overhead than doing so via gRPC messages.
+type SidechannelConn interface {
+ io.ReadWriter
+
+ // CloseWrite tells the server we won't write any more data. We can still
+ // read data from the server after CloseWrite(). A typical use case is in
+ // an RPC where the byte stream has a request/response pattern: the
+ // client then uses CloseWrite() to signal the end of the request body.
+ // When the client calls CloseWrite(), the server receives EOF.
+ CloseWrite() error
+}
+
+// TestSidechannelServer allows downstream consumers of this package to
+// create mock sidechannel gRPC servers.
+func TestSidechannelServer(
+ logger *logrus.Entry,
+ creds credentials.TransportCredentials,
+ handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error,
+) []grpc.ServerOption {
+ return []grpc.ServerOption{
+ SidechannelServer(logger, creds),
+ grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
+ conn, err := OpenServerSidechannel(stream.Context())
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+
+ return handler(srv, stream, conn)
+ }),
+ }
+}
+
+// SidechannelServer adds sidechannel support to a gRPC server
+func SidechannelServer(logger *logrus.Entry, creds credentials.TransportCredentials) grpc.ServerOption {
+ lm := listenmux.New(creds)
+ lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))
+ return grpc.Creds(lm)
+}
+
+// OpenServerSidechannel opens a sidechannel on the server side. This
+// only works if the server was created using SidechannelServer().
+func OpenServerSidechannel(ctx context.Context) (io.ReadWriteCloser, error) {
+ return OpenSidechannel(ctx)
+}
+
+// Dial configures the dialer to establish a Gitaly
+// backchannel connection instead of a regular gRPC connection. It also
+// injects sr as a sidechannel registry, so that Gitaly can establish
+// sidechannels back to the client.
+func Dial(ctx context.Context, rawAddress string, sr *SidechannelRegistry, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
+ clientHandshaker := NewClientHandshaker(sr.logger, sr.registry)
+ return client.DialHandshaker(ctx, rawAddress, connOpts, clientHandshaker)
+}
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 12da30497..2d83c1201 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -9,7 +9,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
"gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v15/internal/cache"
@@ -133,7 +132,7 @@ func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken stri
internalclient.StreamInterceptor(),
}
if authToken != "" {
- grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(authToken)))
+ grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(client.RPCCredentialsV2(authToken)))
}
conn, err := client.DialContext(ctx, addr, grpcOpts)
diff --git a/proto/go.mod b/proto/go.mod
new file mode 100644
index 000000000..b537c31c8
--- /dev/null
+++ b/proto/go.mod
@@ -0,0 +1,16 @@
+module gitlab.com/gitlab-org/gitaly/v15/proto
+
+go 1.18
+
+require (
+ google.golang.org/grpc v1.52.3
+ google.golang.org/protobuf v1.28.1
+)
+
+require (
+ github.com/golang/protobuf v1.5.2 // indirect
+ golang.org/x/net v0.4.0 // indirect
+ golang.org/x/sys v0.3.0 // indirect
+ golang.org/x/text v0.5.0 // indirect
+ google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
+)
diff --git a/proto/go.sum b/proto/go.sum
new file mode 100644
index 000000000..5ec59cf4c
--- /dev/null
+++ b/proto/go.sum
@@ -0,0 +1,20 @@
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
+golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
+golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
+golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
+golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 h1:a2S6M0+660BgMNl++4JPlcAO/CjkqYItDEZwkoDQK7c=
+google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg=
+google.golang.org/grpc v1.52.3 h1:pf7sOysg4LdgBqduXveGKrcEwbStiK2rtfghdzlUYDQ=
+google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
+google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=