diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-06-16 11:30:25 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-06-16 11:30:25 +0300 |
commit | 76bde563c7d6fb5964b5953e97b0f05750e6ac6c (patch) | |
tree | 10c664ad8b26c42b0de23b1624f3ad306870376f | |
parent | 5985ed4c28d1025099fdcc0e9f65fa03de68a29f (diff) | |
parent | 8abc867b19575a01da6ca9939c7e1ad4fd8d44b6 (diff) |
Merge branch 'pks-2pc-praefect-injection' into 'master'
Adjust Praefect server address based on peer info
Closes #2793
See merge request gitlab-org/gitaly!2273
-rw-r--r-- | changelogs/unreleased/pks-2pc-praefect-injection.yml | 5 | ||||
-rw-r--r-- | internal/git/receivepack.go | 16 | ||||
-rw-r--r-- | internal/praefect/metadata/server.go | 99 | ||||
-rw-r--r-- | internal/praefect/metadata/server_test.go | 137 | ||||
-rw-r--r-- | internal/service/hook/pre_receive.go | 11 |
5 files changed, 236 insertions, 32 deletions
diff --git a/changelogs/unreleased/pks-2pc-praefect-injection.yml b/changelogs/unreleased/pks-2pc-praefect-injection.yml new file mode 100644 index 000000000..63264d321 --- /dev/null +++ b/changelogs/unreleased/pks-2pc-praefect-injection.yml @@ -0,0 +1,5 @@ +--- +title: Adjust Praefect server address based on peer info +merge_request: 2273 +author: +type: fixed diff --git a/internal/git/receivepack.go b/internal/git/receivepack.go index 2d27d620d..b8c649827 100644 --- a/internal/git/receivepack.go +++ b/internal/git/receivepack.go @@ -51,26 +51,24 @@ func ReceivePackHookEnv(ctx context.Context, req ReceivePackRequest) ([]string, fmt.Sprintf("%s=%s", featureflag.GoPreReceiveHookEnvVar, strconv.FormatBool(featureflag.IsEnabled(ctx, featureflag.GoPreReceiveHook))), }, gitlabshellEnv...) - praefect, err := metadata.ExtractPraefectServer(ctx) + transaction, err := metadata.ExtractTransaction(ctx) if err == nil { - praefectEnv, err := praefect.Env() + praefect, err := metadata.PraefectFromContext(ctx) if err != nil { return nil, err } - env = append(env, praefectEnv) - } else if !errors.Is(err, metadata.ErrPraefectServerNotFound) { - return nil, err - } + praefectEnv, err := praefect.Env() + if err != nil { + return nil, err + } - transaction, err := metadata.ExtractTransaction(ctx) - if err == nil { transactionEnv, err := transaction.Env() if err != nil { return nil, err } - env = append(env, transactionEnv) + env = append(env, praefectEnv, transactionEnv) } else if !errors.Is(err, metadata.ErrTransactionNotFound) { return nil, err } diff --git a/internal/praefect/metadata/server.go b/internal/praefect/metadata/server.go index 0de0d7a7a..1c88a6b9e 100644 --- a/internal/praefect/metadata/server.go +++ b/internal/praefect/metadata/server.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "net" "strings" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" @@ -13,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" ) const ( @@ -32,24 +34,20 @@ var ( // PraefectServer stores parameters required to connect to a Praefect server type PraefectServer struct { - // Address is the address of the Praefect server - Address string `json:"address"` + // ListenAddr is the TCP listen address of the Praefect server + ListenAddr string `json:"listen_addr"` + // SocketPath is the Unix socket path of the Praefect server + SocketPath string `json:"socket_path"` // Token is the token required to authenticate with the Praefect server Token string `json:"token"` } // InjectPraefectServer injects Praefect connection metadata into an incoming context func InjectPraefectServer(ctx context.Context, conf config.Config) (context.Context, error) { - var address string - if conf.ListenAddr != "" { - address = conf.ListenAddr - } else if conf.SocketPath != "" { - address = "unix://" + conf.SocketPath - } - praefectServer := PraefectServer{ - Address: address, - Token: conf.Auth.Token, + ListenAddr: strings.TrimPrefix(conf.ListenAddr, "tcp://"), + SocketPath: strings.TrimPrefix(conf.SocketPath, "unix://"), + Token: conf.Auth.Token, } marshalled, err := json.Marshal(praefectServer) @@ -68,9 +66,43 @@ func InjectPraefectServer(ctx context.Context, conf config.Config) (context.Cont return metadata.NewIncomingContext(ctx, md), nil } -// ExtractPraefectServer extracts `PraefectServer` from an incoming context. In +// Resolve Praefect address based on its peer information. Depending on how +// Praefect reached out to us, we'll adjust the PraefectServer to contain +// either its Unix or TCP address. +func (p *PraefectServer) resolvePraefectAddress(peer *peer.Peer) error { + switch addr := peer.Addr.(type) { + case *net.UnixAddr: + if p.SocketPath == "" { + return errors.New("resolvePraefectAddress: got Unix peer but no socket path") + } + + p.ListenAddr = "" + return nil + case *net.TCPAddr: + if p.ListenAddr == "" { + return errors.New("resolvePraefectAddress: got TCP peer but no listen address") + } + + // We need to replace Praefect's IP address with the peer's + // address as the value we have is from Praefect's configuration, + // which may be a wildcard IP address ("0.0.0.0"). + _, port, err := net.SplitHostPort(p.ListenAddr) + if err != nil { + return fmt.Errorf("resolvePraefectAddress: could not resolve address %q: %w", p.ListenAddr, err) + } + + p.ListenAddr = net.JoinHostPort(addr.IP.String(), port) + p.SocketPath = "" + + return nil + default: + return fmt.Errorf("resolvePraefectAddress: unknown peer address scheme: %s", peer.Addr.Network()) + } +} + +// PraefectFromContext extracts `PraefectServer` from an incoming context. In // case the metadata key is not set, the function will return `ErrPraefectServerNotFound`. -func ExtractPraefectServer(ctx context.Context) (p *PraefectServer, err error) { +func PraefectFromContext(ctx context.Context) (*PraefectServer, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, ErrPraefectServerNotFound @@ -83,14 +115,24 @@ func ExtractPraefectServer(ctx context.Context) (p *PraefectServer, err error) { decoded, err := base64.StdEncoding.DecodeString(encoded[0]) if err != nil { - return nil, fmt.Errorf("failed decoding base64: %v", err) + return nil, fmt.Errorf("PraefectFromContext: %w", err) } - if err := json.Unmarshal(decoded, &p); err != nil { - return nil, fmt.Errorf("failed unmarshalling json: %v", err) + var praefect PraefectServer + if err := json.Unmarshal(decoded, &praefect); err != nil { + return nil, fmt.Errorf("PraefectFromContext: %w", err) } - return + peer, ok := peer.FromContext(ctx) + if !ok { + return nil, fmt.Errorf("PraefectFromContext: could not get peer") + } + + if err := praefect.resolvePraefectAddress(peer); err != nil { + return nil, err + } + + return &praefect, nil } // PraefectFromEnv extracts `PraefectServer` from the environment variable @@ -123,7 +165,7 @@ func PraefectFromEnv(envvars []string) (*PraefectServer, error) { } // Env encodes the `PraefectServer` and returns an environment variable. -func (p PraefectServer) Env() (string, error) { +func (p *PraefectServer) Env() (string, error) { marshalled, err := json.Marshal(p) if err != nil { return "", err @@ -133,8 +175,20 @@ func (p PraefectServer) Env() (string, error) { return fmt.Sprintf("%s=%s", PraefectEnvKey, encoded), nil } +func (p *PraefectServer) Address() (string, error) { + if p.SocketPath != "" { + return "unix://" + p.SocketPath, nil + } + + if p.ListenAddr != "" { + return "tcp://" + p.ListenAddr, nil + } + + return "", errors.New("no address configured") +} + // Dial will try to connect to the given Praefect server -func (p PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) { +func (p *PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithBlock(), } @@ -142,5 +196,10 @@ func (p PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) { opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.Token))) } - return client.DialContext(ctx, p.Address, opts) + address, err := p.Address() + if err != nil { + return nil, err + } + + return client.DialContext(ctx, address, opts) } diff --git a/internal/praefect/metadata/server_test.go b/internal/praefect/metadata/server_test.go new file mode 100644 index 000000000..649d6e5c9 --- /dev/null +++ b/internal/praefect/metadata/server_test.go @@ -0,0 +1,137 @@ +package metadata + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc/peer" +) + +func tcpPeer(t *testing.T, ip string, port int) *peer.Peer { + parsedAddress := net.ParseIP(ip) + require.NotNil(t, parsedAddress) + + return &peer.Peer{ + Addr: &net.TCPAddr{ + IP: parsedAddress, + Port: port, + }, + } +} + +func unixPeer(t *testing.T, socket string) *peer.Peer { + return &peer.Peer{ + Addr: &net.UnixAddr{ + Name: socket, + }, + } +} + +func TestPraefect_InjectMetadata(t *testing.T) { + testcases := []struct { + desc string + listenAddress string + socketPath string + peer *peer.Peer + expectedAddress string + }{ + { + desc: "wildcard listen address", + listenAddress: "0.0.0.0:1234", + peer: tcpPeer(t, "1.2.3.4", 4321), + expectedAddress: "tcp://1.2.3.4:1234", + }, + { + desc: "explicit listen address", + listenAddress: "127.0.0.1:1234", + peer: tcpPeer(t, "1.2.3.4", 4321), + expectedAddress: "tcp://1.2.3.4:1234", + }, + { + desc: "explicit listen address with explicit prefix", + listenAddress: "tcp://127.0.0.1:1234", + peer: tcpPeer(t, "1.2.3.4", 4321), + expectedAddress: "tcp://1.2.3.4:1234", + }, + { + desc: "named host listen address", + listenAddress: "example.com:1234", + peer: tcpPeer(t, "1.2.3.4", 4321), + expectedAddress: "tcp://1.2.3.4:1234", + }, + { + desc: "named host listen address with IPv6 peer", + listenAddress: "example.com:1234", + peer: tcpPeer(t, "2001:1db8:ac10:fe01::", 4321), + expectedAddress: "tcp://[2001:1db8:ac10:fe01::]:1234", + }, + { + desc: "Unix socket path", + socketPath: "/tmp/socket", + peer: unixPeer(t, "@"), + expectedAddress: "unix:///tmp/socket", + }, + { + desc: "Unix socket path with explicit prefix", + socketPath: "unix:///tmp/socket", + peer: unixPeer(t, "@"), + expectedAddress: "unix:///tmp/socket", + }, + { + desc: "both addresses configured with TCP peer", + listenAddress: "0.0.0.0:1234", + socketPath: "/tmp/socket", + peer: tcpPeer(t, "1.2.3.4", 4321), + expectedAddress: "tcp://1.2.3.4:1234", + }, + { + desc: "both addresses configured with Unix peer", + listenAddress: "0.0.0.0:1234", + socketPath: "/tmp/socket", + peer: unixPeer(t, "@"), + expectedAddress: "unix:///tmp/socket", + }, + { + desc: "listen address with Unix peer", + listenAddress: "0.0.0.0:1234", + peer: unixPeer(t, "@"), + expectedAddress: "", + }, + { + desc: "socket path with TCP peer", + socketPath: "/tmp/socket", + peer: tcpPeer(t, "1.2.3.4", 4321), + expectedAddress: "", + }, + } + + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + cfg := config.Config{ + ListenAddr: tc.listenAddress, + SocketPath: tc.socketPath, + } + + ctx = peer.NewContext(ctx, tc.peer) + ctx, err := InjectPraefectServer(ctx, cfg) + require.NoError(t, err) + + server, err := PraefectFromContext(ctx) + if tc.expectedAddress == "" { + require.Error(t, err) + } else { + require.NoError(t, err) + + address, err := server.Address() + require.NoError(t, err) + require.Equal(t, tc.expectedAddress, address) + } + }) + } +} diff --git a/internal/service/hook/pre_receive.go b/internal/service/hook/pre_receive.go index 610622c58..d1410a268 100644 --- a/internal/service/hook/pre_receive.go +++ b/internal/service/hook/pre_receive.go @@ -64,8 +64,13 @@ func gitlabShellHook(hookName string) string { } func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectServer) (*grpc.ClientConn, error) { + address, err := server.Address() + if err != nil { + return nil, err + } + s.mutex.RLock() - conn, ok := s.praefectConnPool[server.Address] + conn, ok := s.praefectConnPool[address] s.mutex.RUnlock() if ok { @@ -75,7 +80,7 @@ func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectS s.mutex.Lock() defer s.mutex.Unlock() - conn, ok = s.praefectConnPool[server.Address] + conn, ok = s.praefectConnPool[address] if !ok { var err error conn, err = server.Dial(ctx) @@ -83,7 +88,7 @@ func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectS return nil, err } - s.praefectConnPool[server.Address] = conn + s.praefectConnPool[address] = conn } return conn, nil |