diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-10-16 06:36:52 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-10-16 06:36:52 +0300 |
commit | 3bdd23173595a931aac476ad0c07c702c30f4391 (patch) | |
tree | 9cf7e64641d4ec8d047a0866352d9a4bad5e03e1 | |
parent | b4b8b03d923647b9e5d42c5f746db1647f7c06ee (diff) | |
parent | e3f8d0f5af28f2dd0d9ac3a1d82dd6cbb7718250 (diff) |
Merge branch 'ash2k/dial-opts-func' into 'master'
Allow to specify a dialer for the connection pool to use
See merge request gitlab-org/gitaly!2630
-rw-r--r-- | changelogs/unreleased/ash2k-dial-opts-func.yml | 5 | ||||
-rw-r--r-- | client/pool.go | 15 | ||||
-rw-r--r-- | client/pool_options.go | 38 | ||||
-rw-r--r-- | client/pool_test.go | 39 |
4 files changed, 92 insertions, 5 deletions
diff --git a/changelogs/unreleased/ash2k-dial-opts-func.yml b/changelogs/unreleased/ash2k-dial-opts-func.yml new file mode 100644 index 000000000..7727ccd48 --- /dev/null +++ b/changelogs/unreleased/ash2k-dial-opts-func.yml @@ -0,0 +1,5 @@ +--- +title: Per-connection gRPC options in client +merge_request: 2630 +author: +type: added diff --git a/client/pool.go b/client/pool.go index 00e6ed05f..e29622e05 100644 --- a/client/pool.go +++ b/client/pool.go @@ -10,19 +10,30 @@ import ( "google.golang.org/grpc" ) +// Dialer is used by the Pool to create a *grpc.ClientConn. +type Dialer func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) + // Pool is a pool of GRPC connections. Connections created by it are safe for // concurrent use. type Pool struct { lock sync.RWMutex connsByAddress map[string]*grpc.ClientConn + dialer Dialer dialOptions []grpc.DialOption } // NewPool creates a new connection pool that's ready for use. func NewPool(dialOptions ...grpc.DialOption) *Pool { + return NewPoolWithOptions(WithDialOptions(dialOptions...)) +} + +// NewPool creates a new connection pool that's ready for use. +func NewPoolWithOptions(poolOptions ...PoolOption) *Pool { + opts := applyPoolOptions(poolOptions) return &Pool{ connsByAddress: make(map[string]*grpc.ClientConn), - dialOptions: dialOptions, + dialer: opts.dialer, + dialOptions: opts.dialOptions, } } @@ -77,7 +88,7 @@ func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) } - cc, err := DialContext(ctx, address, opts) + cc, err := p.dialer(ctx, address, opts) if err != nil { return nil, fmt.Errorf("could not dial source: %v", err) } diff --git a/client/pool_options.go b/client/pool_options.go new file mode 100644 index 000000000..facd08bf0 --- /dev/null +++ b/client/pool_options.go @@ -0,0 +1,38 @@ +package client + +import "google.golang.org/grpc" + +type poolOptions struct { + dialer Dialer + dialOptions []grpc.DialOption +} + +type PoolOption func(*poolOptions) + +func applyPoolOptions(options []PoolOption) *poolOptions { + opts := defaultPoolOptions() + for _, opt := range options { + opt(opts) + } + return opts +} + +func defaultPoolOptions() *poolOptions { + return &poolOptions{ + dialer: DialContext, + } +} + +// WithDialer sets the dialer that is called for each new gRPC connection the pool establishes. +func WithDialer(dialer Dialer) PoolOption { + return func(options *poolOptions) { + options.dialer = dialer + } +} + +// WithDialOptions sets gRPC options to use for the gRPC Dial call. +func WithDialOptions(dialOptions ...grpc.DialOption) PoolOption { + return func(options *poolOptions) { + options.dialOptions = dialOptions + } +} diff --git a/client/pool_test.go b/client/pool_test.go index 5a58e2793..216d82067 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -8,7 +8,9 @@ import ( "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_auth "gitlab.com/gitlab-org/gitaly/internal/gitaly/config/auth" "gitlab.com/gitlab-org/gitaly/internal/gitaly/server/auth" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -27,9 +29,12 @@ func TestPoolDial(t *testing.T) { secure, cleanup := runServer(t, creds) defer cleanup() + var dialFuncInvocationCounter int + testCases := []struct { - desc string - test func(t *testing.T, ctx context.Context, pool *Pool) + desc string + poolOptions []PoolOption + test func(t *testing.T, ctx context.Context, pool *Pool) }{ { desc: "dialing once succeeds", @@ -122,11 +127,39 @@ func TestPoolDial(t *testing.T) { verifyConnection(t, conn, codes.Unauthenticated) }, }, + { + desc: "dialing with dial options succeeds", + poolOptions: []PoolOption{ + WithDialOptions(grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(creds))), + }, + test: func(t *testing.T, ctx context.Context, pool *Pool) { + conn, err := pool.Dial(ctx, secure, "") // no creds here + require.NoError(t, err) + verifyConnection(t, conn, codes.OK) // auth passes + }, + }, + { + desc: "dial options function is invoked per dial", + poolOptions: []PoolOption{ + WithDialer(func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) { + dialFuncInvocationCounter++ + return DialContext(ctx, address, dialOptions) + }), + }, + test: func(t *testing.T, ctx context.Context, pool *Pool) { + _, err := pool.Dial(ctx, secure, "") + require.NoError(t, err) + assert.Equal(t, 1, dialFuncInvocationCounter) + _, err = pool.Dial(ctx, insecure, "") + require.NoError(t, err) + assert.Equal(t, 2, dialFuncInvocationCounter) + }, + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - pool := NewPool() + pool := NewPoolWithOptions(tc.poolOptions...) defer func() { require.NoError(t, pool.Close()) }() |