diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-06 12:58:05 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-07 09:34:27 +0300 |
commit | c50cf9a578f6abd5a7e85a445bd560ec59ab8d51 (patch) | |
tree | 33dcb4b10654b517e0a2bf63f625568f5b6a880e /client | |
parent | 3c81c66fb8011f21e613ea1cce24a2619317c363 (diff) |
client: Move `Pool` logic into internal package
Move the `Pool` logic into our internal client package. Like this we can
reuse the functionality internally without having to rely on our public
interface, which allows us to iterate on its internal implementation
without breaking the public API.
Diffstat (limited to 'client')
-rw-r--r-- | client/pool.go | 90 | ||||
-rw-r--r-- | client/pool_options.go | 39 |
2 files changed, 20 insertions, 109 deletions
diff --git a/client/pool.go b/client/pool.go index ec430f73e..1354f726c 100644 --- a/client/pool.go +++ b/client/pool.go @@ -2,26 +2,30 @@ package client import ( "context" - "errors" - "fmt" - "sync" - gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "google.golang.org/grpc" ) +// PoolOption is an option that can be passed to NewPoolWithOptions. +type PoolOption = client.PoolOption + // Dialer is used by the Pool to create a *grpc.ClientConn. type Dialer func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) -type poolKey struct{ address, token string } +// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes. +func WithDialer(dialer Dialer) PoolOption { + return client.WithDialer(client.Dialer(dialer)) +} + +// WithDialOptions sets gRPC options to use for the gRPC Dial call. +func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption { + return client.WithDialOptions(dialOptions...) +} -// Pool is a pool of GRPC connections. Connections created by it are safe for -// concurrent use. +// Pool is a pool of GRPC connections. Connections created by it are safe for concurrent use. type Pool struct { - lock sync.RWMutex - conns map[poolKey]*grpc.ClientConn - dialer Dialer - dialOptions []grpc.DialOption + pool *client.Pool } // NewPool creates a new connection pool that's ready for use. @@ -31,73 +35,19 @@ func NewPool(dialOptions ...grpc.DialOption) *Pool { // NewPoolWithOptions creates a new connection pool that's ready for use. func NewPoolWithOptions(poolOptions ...PoolOption) *Pool { - opts := applyPoolOptions(poolOptions) return &Pool{ - conns: make(map[poolKey]*grpc.ClientConn), - dialer: opts.dialer, - dialOptions: opts.dialOptions, - } -} - -// Close closes all connections tracked by the connection pool. -func (p *Pool) Close() error { - p.lock.Lock() - defer p.lock.Unlock() - - var firstError error - for addr, conn := range p.conns { - if err := conn.Close(); err != nil && firstError == nil { - firstError = err - } - - delete(p.conns, addr) + pool: client.NewPool(poolOptions...), } - - return firstError } // Dial creates a new client connection in case no connection to the given // address exists already or returns an already established connection. The // returned address must not be `Close()`d. func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) { - return p.getOrCreateConnection(ctx, address, token) + return p.pool.Dial(ctx, address, token) } -func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) { - if address == "" { - return nil, errors.New("address is empty") - } - - key := poolKey{address: address, token: token} - - p.lock.RLock() - cc, ok := p.conns[key] - p.lock.RUnlock() - - if ok { - return cc, nil - } - - p.lock.Lock() - defer p.lock.Unlock() - - cc, ok = p.conns[key] - if ok { - return cc, nil - } - - opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1) - opts = append(opts, p.dialOptions...) - if token != "" { - opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) - } - - cc, err := p.dialer(ctx, address, opts) - if err != nil { - return nil, fmt.Errorf("could not dial source: %w", err) - } - - p.conns[key] = cc - - return cc, nil +// Close closes all connections tracked by the connection pool. +func (p *Pool) Close() error { + return p.pool.Close() } diff --git a/client/pool_options.go b/client/pool_options.go deleted file mode 100644 index a8139c1f2..000000000 --- a/client/pool_options.go +++ /dev/null @@ -1,39 +0,0 @@ -package client - -import "google.golang.org/grpc" - -type poolOptions struct { - dialer Dialer - dialOptions []grpc.DialOption -} - -//nolint:revive // This is unintentionally missing documentation. -type PoolOption func(*poolOptions) - -func applyPoolOptions(options []PoolOption) *poolOptions { - opts := defaultPoolOptions() - for _, opt := range options { - opt(opts) - } - return opts -} - -func defaultPoolOptions() *poolOptions { - return &poolOptions{ - dialer: DialContext, - } -} - -// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes. -func WithDialer(dialer Dialer) PoolOption { - return func(options *poolOptions) { - options.dialer = dialer - } -} - -// WithDialOptions sets gRPC options to use for the gRPC Dial call. -func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption { - return func(options *poolOptions) { - options.dialOptions = dialOptions - } -} |