Welcome to mirror list, hosted at ThFree Co, Russian Federation.

pool.go « client - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: fa2261ae22db23ee32087ffa7fa42411b6b4a3fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package client

import (
	"context"
	"errors"
	"fmt"
	"sync"

	gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
	"google.golang.org/grpc"
)

// Dialer is used by the Pool to create a *grpc.ClientConn.
type Dialer func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error)

type poolKey struct{ address, token string }

// Pool is a pool of GRPC connections. Connections created by it are safe for
// concurrent use.
type Pool struct {
	lock        sync.RWMutex
	conns       map[poolKey]*grpc.ClientConn
	dialer      Dialer
	dialOptions []grpc.DialOption
}

// NewPool creates a new connection pool that's ready for use.
func NewPool(dialOptions ...grpc.DialOption) *Pool {
	return NewPoolWithOptions(WithDialOptions(dialOptions...))
}

// NewPoolWithOptions creates a new connection pool that's ready for use.
func NewPoolWithOptions(poolOptions ...PoolOption) *Pool {
	opts := applyPoolOptions(poolOptions)
	return &Pool{
		conns:       make(map[poolKey]*grpc.ClientConn),
		dialer:      opts.dialer,
		dialOptions: opts.dialOptions,
	}
}

// Close closes all connections tracked by the connection pool.
func (p *Pool) Close() error {
	p.lock.Lock()
	defer p.lock.Unlock()

	var firstError error
	for addr, conn := range p.conns {
		if err := conn.Close(); err != nil && firstError == nil {
			firstError = err
		}

		delete(p.conns, addr)
	}

	return firstError
}

// Dial creates a new client connection in case no connection to the given
// address exists already or returns an already established connection. The
// returned address must not be `Close()`d.
func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
	return p.getOrCreateConnection(ctx, address, token)
}

func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
	if address == "" {
		return nil, errors.New("address is empty")
	}

	key := poolKey{address: address, token: token}

	p.lock.RLock()
	cc, ok := p.conns[key]
	p.lock.RUnlock()

	if ok {
		return cc, nil
	}

	p.lock.Lock()
	defer p.lock.Unlock()

	cc, ok = p.conns[key]
	if ok {
		return cc, nil
	}

	opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1)
	opts = append(opts, p.dialOptions...)
	if token != "" {
		opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
	}

	cc, err := p.dialer(ctx, address, opts)
	if err != nil {
		return nil, fmt.Errorf("could not dial source: %v", err)
	}

	p.conns[key] = cc

	return cc, nil
}