Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/transaction/txinfo/server.go')
-rw-r--r--internal/transaction/txinfo/server.go247
1 files changed, 247 insertions, 0 deletions
diff --git a/internal/transaction/txinfo/server.go b/internal/transaction/txinfo/server.go
new file mode 100644
index 000000000..de9c2cc0a
--- /dev/null
+++ b/internal/transaction/txinfo/server.go
@@ -0,0 +1,247 @@
+package txinfo
+
+import (
+ "context"
+ "encoding/base64"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "net/url"
+
+ "gitlab.com/gitlab-org/gitaly/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
+)
+
+const (
+ // PraefectMetadataKey is the key used to store Praefect server
+ // information in the gRPC metadata.
+ PraefectMetadataKey = "gitaly-praefect-server"
+)
+
+var (
+ // ErrPraefectServerNotFound indicates the Praefect server metadata
+ // could not be found
+ ErrPraefectServerNotFound = errors.New("metadata for Praefect server not found")
+)
+
+// PraefectServer stores parameters required to connect to a Praefect server
+type PraefectServer struct {
+ // BackchannelID identifies the backchannel that corresponds to the Praefect server
+ // that sent the request and should receive the vote. This field is actually filled
+ // in by the Gitaly.
+ BackchannelID backchannel.ID `json:"backchannel_id,omitempty"`
+ // ListenAddr is the TCP listen address of the Praefect server
+ ListenAddr string `json:"listen_addr"`
+ // TLSListenAddr is the TCP listen address of the Praefect server with TLS support
+ TLSListenAddr string `json:"tls_listen_addr"`
+ // SocketPath is the Unix socket path of the Praefect server
+ SocketPath string `json:"socket_path"`
+ // Token is the token required to authenticate with the Praefect server
+ Token string `json:"token"`
+}
+
+// PraefectFromConfig creates a Praefect server for a given configuration.
+func PraefectFromConfig(conf config.Config) (*PraefectServer, error) {
+ praefectServer := PraefectServer{Token: conf.Auth.Token}
+
+ addrBySchema := map[string]*string{
+ starter.TCP: &praefectServer.ListenAddr,
+ starter.TLS: &praefectServer.TLSListenAddr,
+ starter.Unix: &praefectServer.SocketPath,
+ }
+
+ for _, endpoint := range []struct {
+ schema string
+ addr string
+ }{
+ {schema: starter.TCP, addr: conf.ListenAddr},
+ {schema: starter.TLS, addr: conf.TLSListenAddr},
+ {schema: starter.Unix, addr: conf.SocketPath},
+ } {
+ if endpoint.addr == "" {
+ continue
+ }
+
+ parsed, err := starter.ParseEndpoint(endpoint.addr)
+ if err != nil {
+ if !errors.Is(err, starter.ErrEmptySchema) {
+ return nil, err
+ }
+ parsed = starter.Config{Name: endpoint.schema, Addr: endpoint.addr}
+ }
+
+ addr, err := parsed.Endpoint()
+ if err != nil {
+ return nil, fmt.Errorf("processing of %s: %w", endpoint.schema, err)
+ }
+
+ *addrBySchema[endpoint.schema] = addr
+ }
+
+ return &praefectServer, nil
+}
+
+// Inject injects Praefect connection metadata into an incoming context
+func (p *PraefectServer) Inject(ctx context.Context) (context.Context, error) {
+ serialized, err := p.serialize()
+ if err != nil {
+ return nil, err
+ }
+
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ md = metadata.New(map[string]string{})
+ } else {
+ md = md.Copy()
+ }
+ md.Set(PraefectMetadataKey, serialized)
+
+ return metadata.NewIncomingContext(ctx, md), nil
+}
+
+// Resolve Praefect address based on its peer information. Depending on how
+// Praefect reached out to us, we'll adjust the PraefectServer to contain
+// either its Unix or TCP address.
+func (p *PraefectServer) resolvePraefectAddress(peer *peer.Peer) error {
+ switch addr := peer.Addr.(type) {
+ case *net.UnixAddr:
+ if p.SocketPath == "" {
+ return errors.New("resolvePraefectAddress: got Unix peer but no socket path")
+ }
+
+ p.ListenAddr = ""
+ p.TLSListenAddr = ""
+
+ return nil
+ case *net.TCPAddr:
+ var authType string
+ if peer.AuthInfo != nil {
+ authType = peer.AuthInfo.AuthType()
+ }
+
+ switch authType {
+ case "", backchannel.Insecure().Info().SecurityProtocol:
+ // no transport security being used
+ addr, err := substituteListeningWithIP(p.ListenAddr, addr.IP.String())
+ if err != nil {
+ return fmt.Errorf("resolvePraefectAddress: for ListenAddr: %w", err)
+ }
+
+ p.ListenAddr = addr
+ p.TLSListenAddr = ""
+ p.SocketPath = ""
+
+ return nil
+ default:
+ authType := peer.AuthInfo.AuthType()
+ if authType != (credentials.TLSInfo{}).AuthType() {
+ return fmt.Errorf("resolvePraefectAddress: got TCP peer but with unknown transport security type %q", authType)
+ }
+
+ addr, err := substituteListeningWithIP(p.TLSListenAddr, addr.IP.String())
+ if err != nil {
+ return fmt.Errorf("resolvePraefectAddress: for TLSListenAddr: %w", err)
+ }
+
+ p.TLSListenAddr = addr
+ p.ListenAddr = ""
+ p.SocketPath = ""
+
+ return nil
+ }
+ default:
+ return fmt.Errorf("resolvePraefectAddress: unknown peer address scheme: %s", peer.Addr.Network())
+ }
+}
+
+func substituteListeningWithIP(listenAddr, ip string) (string, error) {
+ if listenAddr == "" {
+ return "", errors.New("listening address is empty")
+ }
+
+ // We need to replace Praefect's IP address with the peer's
+ // address as the value we have is from Praefect's configuration,
+ // which may be a wildcard IP address ("0.0.0.0").
+ listenURL, err := url.Parse(listenAddr)
+ if err != nil {
+ return "", fmt.Errorf("parse listening address %q: %w", listenAddr, err)
+ }
+
+ listenURL.Host = net.JoinHostPort(ip, listenURL.Port())
+ return listenURL.String(), nil
+}
+
+// PraefectFromContext extracts `PraefectServer` from an incoming context. In
+// case the metadata key is not set, the function will return `ErrPraefectServerNotFound`.
+func PraefectFromContext(ctx context.Context) (*PraefectServer, error) {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return nil, ErrPraefectServerNotFound
+ }
+
+ serialized := md[PraefectMetadataKey]
+ if len(serialized) == 0 {
+ return nil, ErrPraefectServerNotFound
+ }
+
+ praefect, err := praefectFromSerialized(serialized[0])
+ if err != nil {
+ return nil, err
+ }
+
+ peer, ok := peer.FromContext(ctx)
+ if !ok {
+ return nil, fmt.Errorf("PraefectFromContext: could not get peer")
+ }
+
+ if err := praefect.resolvePraefectAddress(peer); err != nil {
+ return nil, err
+ }
+
+ praefect.BackchannelID, err = backchannel.GetPeerID(ctx)
+ if err != nil && !errors.Is(err, backchannel.ErrNonMultiplexedConnection) {
+ return nil, fmt.Errorf("get peer id: %w", err)
+ }
+
+ return praefect, nil
+}
+
+func (p *PraefectServer) serialize() (string, error) {
+ marshalled, err := json.Marshal(p)
+ if err != nil {
+ return "", err
+ }
+
+ return base64.StdEncoding.EncodeToString(marshalled), nil
+}
+
+// praefectFromSerialized creates a Praefect server from a `serialize()`d string.
+func praefectFromSerialized(serialized string) (*PraefectServer, error) {
+ decoded, err := base64.StdEncoding.DecodeString(serialized)
+ if err != nil {
+ return nil, err
+ }
+
+ var server PraefectServer
+ if err := json.Unmarshal(decoded, &server); err != nil {
+ return nil, err
+ }
+
+ return &server, nil
+}
+
+// Address returns the address of the Praefect server which can be used to connect to it.
+func (p *PraefectServer) Address() (string, error) {
+ for _, addr := range []string{p.SocketPath, p.TLSListenAddr, p.ListenAddr} {
+ if addr != "" {
+ return addr, nil
+ }
+ }
+
+ return "", errors.New("no address configured")
+}