Welcome to mirror list, hosted at ThFree Co, Russian Federation.

client_connections.go « conn « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: c3734937a732ad435832a9a8d54aaf2ec28f91bc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package conn

import (
	"errors"
	"sync"

	gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
	"gitlab.com/gitlab-org/gitaly/client"
	"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
	"google.golang.org/grpc"
)

// ErrConnectionNotFound indicates the connection for a given storage has not yet been registered
var ErrConnectionNotFound = errors.New("client connection not found")

// ErrAlreadyRegistered indicates the client connection for a given storage has already been registered
var ErrAlreadyRegistered = errors.New("client connection already registered")

// ClientConnections contains ready to use grpc client connections
type ClientConnections struct {
	connMutex sync.RWMutex
	nodes     map[string]*grpc.ClientConn
}

// NewClientConnections creates a new ClientConnections struct
func NewClientConnections() *ClientConnections {
	return &ClientConnections{
		nodes: make(map[string]*grpc.ClientConn),
	}
}

// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
func (c *ClientConnections) RegisterNode(storageName, listenAddr, token string) error {
	conn, err := client.Dial(listenAddr,
		[]grpc.DialOption{
			grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
			grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token)),
		},
	)
	if err != nil {
		return err
	}

	return c.setConn(storageName, conn)
}

func (c *ClientConnections) setConn(storageName string, conn *grpc.ClientConn) error {
	c.connMutex.Lock()
	if _, ok := c.nodes[storageName]; ok {
		return ErrAlreadyRegistered
	}
	c.nodes[storageName] = conn
	c.connMutex.Unlock()

	return nil
}

// GetConnection gets the grpc client connection based on an address
func (c *ClientConnections) GetConnection(storageName string) (*grpc.ClientConn, error) {
	c.connMutex.RLock()
	cc, ok := c.nodes[storageName]
	c.connMutex.RUnlock()
	if !ok {
		return nil, ErrConnectionNotFound
	}

	return cc, nil

}