Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-06-16 11:30:25 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-06-16 11:30:25 +0300
commit76bde563c7d6fb5964b5953e97b0f05750e6ac6c (patch)
tree10c664ad8b26c42b0de23b1624f3ad306870376f
parent5985ed4c28d1025099fdcc0e9f65fa03de68a29f (diff)
parent8abc867b19575a01da6ca9939c7e1ad4fd8d44b6 (diff)
Merge branch 'pks-2pc-praefect-injection' into 'master'
Adjust Praefect server address based on peer info Closes #2793 See merge request gitlab-org/gitaly!2273
-rw-r--r--changelogs/unreleased/pks-2pc-praefect-injection.yml5
-rw-r--r--internal/git/receivepack.go16
-rw-r--r--internal/praefect/metadata/server.go99
-rw-r--r--internal/praefect/metadata/server_test.go137
-rw-r--r--internal/service/hook/pre_receive.go11
5 files changed, 236 insertions, 32 deletions
diff --git a/changelogs/unreleased/pks-2pc-praefect-injection.yml b/changelogs/unreleased/pks-2pc-praefect-injection.yml
new file mode 100644
index 000000000..63264d321
--- /dev/null
+++ b/changelogs/unreleased/pks-2pc-praefect-injection.yml
@@ -0,0 +1,5 @@
+---
+title: Adjust Praefect server address based on peer info
+merge_request: 2273
+author:
+type: fixed
diff --git a/internal/git/receivepack.go b/internal/git/receivepack.go
index 2d27d620d..b8c649827 100644
--- a/internal/git/receivepack.go
+++ b/internal/git/receivepack.go
@@ -51,26 +51,24 @@ func ReceivePackHookEnv(ctx context.Context, req ReceivePackRequest) ([]string,
fmt.Sprintf("%s=%s", featureflag.GoPreReceiveHookEnvVar, strconv.FormatBool(featureflag.IsEnabled(ctx, featureflag.GoPreReceiveHook))),
}, gitlabshellEnv...)
- praefect, err := metadata.ExtractPraefectServer(ctx)
+ transaction, err := metadata.ExtractTransaction(ctx)
if err == nil {
- praefectEnv, err := praefect.Env()
+ praefect, err := metadata.PraefectFromContext(ctx)
if err != nil {
return nil, err
}
- env = append(env, praefectEnv)
- } else if !errors.Is(err, metadata.ErrPraefectServerNotFound) {
- return nil, err
- }
+ praefectEnv, err := praefect.Env()
+ if err != nil {
+ return nil, err
+ }
- transaction, err := metadata.ExtractTransaction(ctx)
- if err == nil {
transactionEnv, err := transaction.Env()
if err != nil {
return nil, err
}
- env = append(env, transactionEnv)
+ env = append(env, praefectEnv, transactionEnv)
} else if !errors.Is(err, metadata.ErrTransactionNotFound) {
return nil, err
}
diff --git a/internal/praefect/metadata/server.go b/internal/praefect/metadata/server.go
index 0de0d7a7a..1c88a6b9e 100644
--- a/internal/praefect/metadata/server.go
+++ b/internal/praefect/metadata/server.go
@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "net"
"strings"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
@@ -13,6 +14,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
)
const (
@@ -32,24 +34,20 @@ var (
// PraefectServer stores parameters required to connect to a Praefect server
type PraefectServer struct {
- // Address is the address of the Praefect server
- Address string `json:"address"`
+ // ListenAddr is the TCP listen address of the Praefect server
+ ListenAddr string `json:"listen_addr"`
+ // SocketPath is the Unix socket path of the Praefect server
+ SocketPath string `json:"socket_path"`
// Token is the token required to authenticate with the Praefect server
Token string `json:"token"`
}
// InjectPraefectServer injects Praefect connection metadata into an incoming context
func InjectPraefectServer(ctx context.Context, conf config.Config) (context.Context, error) {
- var address string
- if conf.ListenAddr != "" {
- address = conf.ListenAddr
- } else if conf.SocketPath != "" {
- address = "unix://" + conf.SocketPath
- }
-
praefectServer := PraefectServer{
- Address: address,
- Token: conf.Auth.Token,
+ ListenAddr: strings.TrimPrefix(conf.ListenAddr, "tcp://"),
+ SocketPath: strings.TrimPrefix(conf.SocketPath, "unix://"),
+ Token: conf.Auth.Token,
}
marshalled, err := json.Marshal(praefectServer)
@@ -68,9 +66,43 @@ func InjectPraefectServer(ctx context.Context, conf config.Config) (context.Cont
return metadata.NewIncomingContext(ctx, md), nil
}
-// ExtractPraefectServer extracts `PraefectServer` from an incoming context. In
+// Resolve Praefect address based on its peer information. Depending on how
+// Praefect reached out to us, we'll adjust the PraefectServer to contain
+// either its Unix or TCP address.
+func (p *PraefectServer) resolvePraefectAddress(peer *peer.Peer) error {
+ switch addr := peer.Addr.(type) {
+ case *net.UnixAddr:
+ if p.SocketPath == "" {
+ return errors.New("resolvePraefectAddress: got Unix peer but no socket path")
+ }
+
+ p.ListenAddr = ""
+ return nil
+ case *net.TCPAddr:
+ if p.ListenAddr == "" {
+ return errors.New("resolvePraefectAddress: got TCP peer but no listen address")
+ }
+
+ // We need to replace Praefect's IP address with the peer's
+ // address as the value we have is from Praefect's configuration,
+ // which may be a wildcard IP address ("0.0.0.0").
+ _, port, err := net.SplitHostPort(p.ListenAddr)
+ if err != nil {
+ return fmt.Errorf("resolvePraefectAddress: could not resolve address %q: %w", p.ListenAddr, err)
+ }
+
+ p.ListenAddr = net.JoinHostPort(addr.IP.String(), port)
+ p.SocketPath = ""
+
+ return nil
+ default:
+ return fmt.Errorf("resolvePraefectAddress: unknown peer address scheme: %s", peer.Addr.Network())
+ }
+}
+
+// PraefectFromContext extracts `PraefectServer` from an incoming context. In
// case the metadata key is not set, the function will return `ErrPraefectServerNotFound`.
-func ExtractPraefectServer(ctx context.Context) (p *PraefectServer, err error) {
+func PraefectFromContext(ctx context.Context) (*PraefectServer, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, ErrPraefectServerNotFound
@@ -83,14 +115,24 @@ func ExtractPraefectServer(ctx context.Context) (p *PraefectServer, err error) {
decoded, err := base64.StdEncoding.DecodeString(encoded[0])
if err != nil {
- return nil, fmt.Errorf("failed decoding base64: %v", err)
+ return nil, fmt.Errorf("PraefectFromContext: %w", err)
}
- if err := json.Unmarshal(decoded, &p); err != nil {
- return nil, fmt.Errorf("failed unmarshalling json: %v", err)
+ var praefect PraefectServer
+ if err := json.Unmarshal(decoded, &praefect); err != nil {
+ return nil, fmt.Errorf("PraefectFromContext: %w", err)
}
- return
+ peer, ok := peer.FromContext(ctx)
+ if !ok {
+ return nil, fmt.Errorf("PraefectFromContext: could not get peer")
+ }
+
+ if err := praefect.resolvePraefectAddress(peer); err != nil {
+ return nil, err
+ }
+
+ return &praefect, nil
}
// PraefectFromEnv extracts `PraefectServer` from the environment variable
@@ -123,7 +165,7 @@ func PraefectFromEnv(envvars []string) (*PraefectServer, error) {
}
// Env encodes the `PraefectServer` and returns an environment variable.
-func (p PraefectServer) Env() (string, error) {
+func (p *PraefectServer) Env() (string, error) {
marshalled, err := json.Marshal(p)
if err != nil {
return "", err
@@ -133,8 +175,20 @@ func (p PraefectServer) Env() (string, error) {
return fmt.Sprintf("%s=%s", PraefectEnvKey, encoded), nil
}
+func (p *PraefectServer) Address() (string, error) {
+ if p.SocketPath != "" {
+ return "unix://" + p.SocketPath, nil
+ }
+
+ if p.ListenAddr != "" {
+ return "tcp://" + p.ListenAddr, nil
+ }
+
+ return "", errors.New("no address configured")
+}
+
// Dial will try to connect to the given Praefect server
-func (p PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) {
+func (p *PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithBlock(),
}
@@ -142,5 +196,10 @@ func (p PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.Token)))
}
- return client.DialContext(ctx, p.Address, opts)
+ address, err := p.Address()
+ if err != nil {
+ return nil, err
+ }
+
+ return client.DialContext(ctx, address, opts)
}
diff --git a/internal/praefect/metadata/server_test.go b/internal/praefect/metadata/server_test.go
new file mode 100644
index 000000000..649d6e5c9
--- /dev/null
+++ b/internal/praefect/metadata/server_test.go
@@ -0,0 +1,137 @@
+package metadata
+
+import (
+ "net"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "google.golang.org/grpc/peer"
+)
+
+func tcpPeer(t *testing.T, ip string, port int) *peer.Peer {
+ parsedAddress := net.ParseIP(ip)
+ require.NotNil(t, parsedAddress)
+
+ return &peer.Peer{
+ Addr: &net.TCPAddr{
+ IP: parsedAddress,
+ Port: port,
+ },
+ }
+}
+
+func unixPeer(t *testing.T, socket string) *peer.Peer {
+ return &peer.Peer{
+ Addr: &net.UnixAddr{
+ Name: socket,
+ },
+ }
+}
+
+func TestPraefect_InjectMetadata(t *testing.T) {
+ testcases := []struct {
+ desc string
+ listenAddress string
+ socketPath string
+ peer *peer.Peer
+ expectedAddress string
+ }{
+ {
+ desc: "wildcard listen address",
+ listenAddress: "0.0.0.0:1234",
+ peer: tcpPeer(t, "1.2.3.4", 4321),
+ expectedAddress: "tcp://1.2.3.4:1234",
+ },
+ {
+ desc: "explicit listen address",
+ listenAddress: "127.0.0.1:1234",
+ peer: tcpPeer(t, "1.2.3.4", 4321),
+ expectedAddress: "tcp://1.2.3.4:1234",
+ },
+ {
+ desc: "explicit listen address with explicit prefix",
+ listenAddress: "tcp://127.0.0.1:1234",
+ peer: tcpPeer(t, "1.2.3.4", 4321),
+ expectedAddress: "tcp://1.2.3.4:1234",
+ },
+ {
+ desc: "named host listen address",
+ listenAddress: "example.com:1234",
+ peer: tcpPeer(t, "1.2.3.4", 4321),
+ expectedAddress: "tcp://1.2.3.4:1234",
+ },
+ {
+ desc: "named host listen address with IPv6 peer",
+ listenAddress: "example.com:1234",
+ peer: tcpPeer(t, "2001:1db8:ac10:fe01::", 4321),
+ expectedAddress: "tcp://[2001:1db8:ac10:fe01::]:1234",
+ },
+ {
+ desc: "Unix socket path",
+ socketPath: "/tmp/socket",
+ peer: unixPeer(t, "@"),
+ expectedAddress: "unix:///tmp/socket",
+ },
+ {
+ desc: "Unix socket path with explicit prefix",
+ socketPath: "unix:///tmp/socket",
+ peer: unixPeer(t, "@"),
+ expectedAddress: "unix:///tmp/socket",
+ },
+ {
+ desc: "both addresses configured with TCP peer",
+ listenAddress: "0.0.0.0:1234",
+ socketPath: "/tmp/socket",
+ peer: tcpPeer(t, "1.2.3.4", 4321),
+ expectedAddress: "tcp://1.2.3.4:1234",
+ },
+ {
+ desc: "both addresses configured with Unix peer",
+ listenAddress: "0.0.0.0:1234",
+ socketPath: "/tmp/socket",
+ peer: unixPeer(t, "@"),
+ expectedAddress: "unix:///tmp/socket",
+ },
+ {
+ desc: "listen address with Unix peer",
+ listenAddress: "0.0.0.0:1234",
+ peer: unixPeer(t, "@"),
+ expectedAddress: "",
+ },
+ {
+ desc: "socket path with TCP peer",
+ socketPath: "/tmp/socket",
+ peer: tcpPeer(t, "1.2.3.4", 4321),
+ expectedAddress: "",
+ },
+ }
+
+ for _, tc := range testcases {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ cfg := config.Config{
+ ListenAddr: tc.listenAddress,
+ SocketPath: tc.socketPath,
+ }
+
+ ctx = peer.NewContext(ctx, tc.peer)
+ ctx, err := InjectPraefectServer(ctx, cfg)
+ require.NoError(t, err)
+
+ server, err := PraefectFromContext(ctx)
+ if tc.expectedAddress == "" {
+ require.Error(t, err)
+ } else {
+ require.NoError(t, err)
+
+ address, err := server.Address()
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedAddress, address)
+ }
+ })
+ }
+}
diff --git a/internal/service/hook/pre_receive.go b/internal/service/hook/pre_receive.go
index 610622c58..d1410a268 100644
--- a/internal/service/hook/pre_receive.go
+++ b/internal/service/hook/pre_receive.go
@@ -64,8 +64,13 @@ func gitlabShellHook(hookName string) string {
}
func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectServer) (*grpc.ClientConn, error) {
+ address, err := server.Address()
+ if err != nil {
+ return nil, err
+ }
+
s.mutex.RLock()
- conn, ok := s.praefectConnPool[server.Address]
+ conn, ok := s.praefectConnPool[address]
s.mutex.RUnlock()
if ok {
@@ -75,7 +80,7 @@ func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectS
s.mutex.Lock()
defer s.mutex.Unlock()
- conn, ok = s.praefectConnPool[server.Address]
+ conn, ok = s.praefectConnPool[address]
if !ok {
var err error
conn, err = server.Dial(ctx)
@@ -83,7 +88,7 @@ func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectS
return nil, err
}
- s.praefectConnPool[server.Address] = conn
+ s.praefectConnPool[address] = conn
}
return conn, nil