Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-06 12:58:05 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-07 09:34:27 +0300
commitc50cf9a578f6abd5a7e85a445bd560ec59ab8d51 (patch)
tree33dcb4b10654b517e0a2bf63f625568f5b6a880e /client
parent3c81c66fb8011f21e613ea1cce24a2619317c363 (diff)
client: Move `Pool` logic into internal package
Move the `Pool` logic into our internal client package. Like this we can reuse the functionality internally without having to rely on our public interface, which allows us to iterate on its internal implementation without breaking the public API.
Diffstat (limited to 'client')
-rw-r--r--client/pool.go90
-rw-r--r--client/pool_options.go39
2 files changed, 20 insertions, 109 deletions
diff --git a/client/pool.go b/client/pool.go
index ec430f73e..1354f726c 100644
--- a/client/pool.go
+++ b/client/pool.go
@@ -2,26 +2,30 @@ package client
import (
"context"
- "errors"
- "fmt"
- "sync"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"google.golang.org/grpc"
)
+// PoolOption is an option that can be passed to NewPoolWithOptions.
+type PoolOption = client.PoolOption
+
// Dialer is used by the Pool to create a *grpc.ClientConn.
type Dialer func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error)
-type poolKey struct{ address, token string }
+// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes.
+func WithDialer(dialer Dialer) PoolOption {
+ return client.WithDialer(client.Dialer(dialer))
+}
+
+// WithDialOptions sets gRPC options to use for the gRPC Dial call.
+func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption {
+ return client.WithDialOptions(dialOptions...)
+}
-// Pool is a pool of GRPC connections. Connections created by it are safe for
-// concurrent use.
+// Pool is a pool of GRPC connections. Connections created by it are safe for concurrent use.
type Pool struct {
- lock sync.RWMutex
- conns map[poolKey]*grpc.ClientConn
- dialer Dialer
- dialOptions []grpc.DialOption
+ pool *client.Pool
}
// NewPool creates a new connection pool that's ready for use.
@@ -31,73 +35,19 @@ func NewPool(dialOptions ...grpc.DialOption) *Pool {
// NewPoolWithOptions creates a new connection pool that's ready for use.
func NewPoolWithOptions(poolOptions ...PoolOption) *Pool {
- opts := applyPoolOptions(poolOptions)
return &Pool{
- conns: make(map[poolKey]*grpc.ClientConn),
- dialer: opts.dialer,
- dialOptions: opts.dialOptions,
- }
-}
-
-// Close closes all connections tracked by the connection pool.
-func (p *Pool) Close() error {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- var firstError error
- for addr, conn := range p.conns {
- if err := conn.Close(); err != nil && firstError == nil {
- firstError = err
- }
-
- delete(p.conns, addr)
+ pool: client.NewPool(poolOptions...),
}
-
- return firstError
}
// Dial creates a new client connection in case no connection to the given
// address exists already or returns an already established connection. The
// returned address must not be `Close()`d.
func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
- return p.getOrCreateConnection(ctx, address, token)
+ return p.pool.Dial(ctx, address, token)
}
-func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
- if address == "" {
- return nil, errors.New("address is empty")
- }
-
- key := poolKey{address: address, token: token}
-
- p.lock.RLock()
- cc, ok := p.conns[key]
- p.lock.RUnlock()
-
- if ok {
- return cc, nil
- }
-
- p.lock.Lock()
- defer p.lock.Unlock()
-
- cc, ok = p.conns[key]
- if ok {
- return cc, nil
- }
-
- opts := make([]grpc.DialOption, 0, len(p.dialOptions)+1)
- opts = append(opts, p.dialOptions...)
- if token != "" {
- opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
- }
-
- cc, err := p.dialer(ctx, address, opts)
- if err != nil {
- return nil, fmt.Errorf("could not dial source: %w", err)
- }
-
- p.conns[key] = cc
-
- return cc, nil
+// Close closes all connections tracked by the connection pool.
+func (p *Pool) Close() error {
+ return p.pool.Close()
}
diff --git a/client/pool_options.go b/client/pool_options.go
deleted file mode 100644
index a8139c1f2..000000000
--- a/client/pool_options.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package client
-
-import "google.golang.org/grpc"
-
-type poolOptions struct {
- dialer Dialer
- dialOptions []grpc.DialOption
-}
-
-//nolint:revive // This is unintentionally missing documentation.
-type PoolOption func(*poolOptions)
-
-func applyPoolOptions(options []PoolOption) *poolOptions {
- opts := defaultPoolOptions()
- for _, opt := range options {
- opt(opts)
- }
- return opts
-}
-
-func defaultPoolOptions() *poolOptions {
- return &poolOptions{
- dialer: DialContext,
- }
-}
-
-// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes.
-func WithDialer(dialer Dialer) PoolOption {
- return func(options *poolOptions) {
- options.dialer = dialer
- }
-}
-
-// WithDialOptions sets gRPC options to use for the gRPC Dial call.
-func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption {
- return func(options *poolOptions) {
- options.dialOptions = dialOptions
- }
-}