diff options
Diffstat (limited to 'client')
-rw-r--r-- | client/pool.go | 90 | ||||
-rw-r--r-- | client/pool_options.go | 39 |
2 files changed, 20 insertions, 109 deletions
diff --git a/client/pool.go b/client/pool.go index ec430f73e..1354f726c 100644 --- a/client/pool.go +++ b/client/pool.go @@ -2,26 +2,30 @@ package client import ( "context" - "errors" - "fmt" - "sync" - gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "google.golang.org/grpc" ) +// PoolOption is an option that can be passed to NewPoolWithOptions. +type PoolOption = client.PoolOption + // Dialer is used by the Pool to create a *grpc.ClientConn. type Dialer func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) -type poolKey struct{ address, token string } +// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes. +func WithDialer(dialer Dialer) PoolOption { + return client.WithDialer(client.Dialer(dialer)) +} + +// WithDialOptions sets gRPC options to use for the gRPC Dial call. +func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption { + return client.WithDialOptions(dialOptions...) +} -// Pool is a pool of GRPC connections. Connections created by it are safe for -// concurrent use. +// Pool is a pool of GRPC connections. Connections created by it are safe for concurrent use. type Pool struct { - lock sync.RWMutex - conns map[poolKey]*grpc.ClientConn - dialer Dialer - dialOptions []grpc.DialOption + pool *client.Pool } // NewPool creates a new connection pool that's ready for use. @@ -31,73 +35,19 @@ func NewPool(dialOptions ...grpc.DialOption) *Pool { // NewPoolWithOptions creates a new connection pool that's ready for use. func NewPoolWithOptions(poolOptions ...PoolOption) *Pool { - opts := applyPoolOptions(poolOptions) return &Pool{ - conns: make(map[poolKey]*grpc.ClientConn), - dialer: opts.dialer, - dialOptions: opts.dialOptions, - } -} - -// Close closes all connections tracked by the connection pool. -func (p *Pool) Close() error { - p.lock.Lock() - defer p.lock.Unlock() - - var firstError error - for addr, conn := range p.conns { - if err := conn.Close(); err != nil && firstError == nil { - firstError = err - } - - delete(p.conns, addr) + pool: client.NewPool(poolOptions...), } - - return firstError } // Dial creates a new client connection in case no connection to the given // address exists already or returns an already established connection. The // returned address must not be `Close()`d. func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) { - return p.getOrCreateConnection(ctx, address, token) + return p.pool.Dial(ctx, address, token) } -func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) { - if address == "" { - return nil, errors.New("address is empty") - } - - key := poolKey{address: address, token: token} - - p.lock.RLock() - cc, ok := p.conns[key] - p.lock.RUnlock() - - if ok { - return cc, nil - } - - p.lock.Lock() - defer p.lock.Unlock() - - cc, ok = p.conns[key] - if ok { - return cc, nil - } - - opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1) - opts = append(opts, p.dialOptions...) - if token != "" { - opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) - } - - cc, err := p.dialer(ctx, address, opts) - if err != nil { - return nil, fmt.Errorf("could not dial source: %w", err) - } - - p.conns[key] = cc - - return cc, nil +// Close closes all connections tracked by the connection pool. +func (p *Pool) Close() error { + return p.pool.Close() } diff --git a/client/pool_options.go b/client/pool_options.go deleted file mode 100644 index a8139c1f2..000000000 --- a/client/pool_options.go +++ /dev/null @@ -1,39 +0,0 @@ -package client - -import "google.golang.org/grpc" - -type poolOptions struct { - dialer Dialer - dialOptions []grpc.DialOption -} - -//nolint:revive // This is unintentionally missing documentation. -type PoolOption func(*poolOptions) - -func applyPoolOptions(options []PoolOption) *poolOptions { - opts := defaultPoolOptions() - for _, opt := range options { - opt(opts) - } - return opts -} - -func defaultPoolOptions() *poolOptions { - return &poolOptions{ - dialer: DialContext, - } -} - -// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes. -func WithDialer(dialer Dialer) PoolOption { - return func(options *poolOptions) { - options.dialer = dialer - } -} - -// WithDialOptions sets gRPC options to use for the gRPC Dial call. -func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption { - return func(options *poolOptions) { - options.dialOptions = dialOptions - } -} |