Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-12-03 18:23:31 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-12-07 14:53:11 +0300
commit5041b60c0bd9f4fec1d9e565daba073274b0763d (patch)
tree4fcfd829350f95f555d779fdc10cc41eb040b0e5 /client
parenta4987c02548c6663eb527b7a1f95a987d2be6d61 (diff)
Connections pool should respect token as part of the key
It could be possible the same address will require a new token to be used. This could be possible in case of the new token rollout. The simplest approach is to make a token part of the key in the connections pool. With that the same address will be able to have multiple connections with different token. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2699
Diffstat (limited to 'client')
-rw-r--r--client/pool.go28
-rw-r--r--client/pool_test.go51
2 files changed, 58 insertions, 21 deletions
diff --git a/client/pool.go b/client/pool.go
index e29622e05..84067f4ff 100644
--- a/client/pool.go
+++ b/client/pool.go
@@ -13,13 +13,15 @@ import (
// Dialer is used by the Pool to create a *grpc.ClientConn.
type Dialer func(ctx context.Context, address string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error)
+type poolKey struct{ address, token string }
+
// Pool is a pool of GRPC connections. Connections created by it are safe for
// concurrent use.
type Pool struct {
- lock sync.RWMutex
- connsByAddress map[string]*grpc.ClientConn
- dialer Dialer
- dialOptions []grpc.DialOption
+ lock sync.RWMutex
+ conns map[poolKey]*grpc.ClientConn
+ dialer Dialer
+ dialOptions []grpc.DialOption
}
// NewPool creates a new connection pool that's ready for use.
@@ -31,9 +33,9 @@ func NewPool(dialOptions ...grpc.DialOption) *Pool {
func NewPoolWithOptions(poolOptions ...PoolOption) *Pool {
opts := applyPoolOptions(poolOptions)
return &Pool{
- connsByAddress: make(map[string]*grpc.ClientConn),
- dialer: opts.dialer,
- dialOptions: opts.dialOptions,
+ conns: make(map[poolKey]*grpc.ClientConn),
+ dialer: opts.dialer,
+ dialOptions: opts.dialOptions,
}
}
@@ -43,12 +45,12 @@ func (p *Pool) Close() error {
defer p.lock.Unlock()
var firstError error
- for addr, conn := range p.connsByAddress {
+ for addr, conn := range p.conns {
if err := conn.Close(); err != nil && firstError == nil {
firstError = err
}
- delete(p.connsByAddress, addr)
+ delete(p.conns, addr)
}
return firstError
@@ -66,8 +68,10 @@ func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string)
return nil, errors.New("address is empty")
}
+ key := poolKey{address: address, token: token}
+
p.lock.RLock()
- cc, ok := p.connsByAddress[address]
+ cc, ok := p.conns[key]
p.lock.RUnlock()
if ok {
@@ -77,7 +81,7 @@ func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string)
p.lock.Lock()
defer p.lock.Unlock()
- cc, ok = p.connsByAddress[address]
+ cc, ok = p.conns[key]
if ok {
return cc, nil
}
@@ -93,7 +97,7 @@ func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string)
return nil, fmt.Errorf("could not dial source: %v", err)
}
- p.connsByAddress[address] = cc
+ p.conns[key] = cc
return cc, nil
}
diff --git a/client/pool_test.go b/client/pool_test.go
index 216d82067..e025daa70 100644
--- a/client/pool_test.go
+++ b/client/pool_test.go
@@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter"
gitaly_auth "gitlab.com/gitlab-org/gitaly/internal/gitaly/config/auth"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -22,11 +23,11 @@ import (
)
func TestPoolDial(t *testing.T) {
- insecure, cleanup := runServer(t, "")
+ _, insecure, cleanup := runServer(t, "")
defer cleanup()
creds := "my-little-secret"
- secure, cleanup := runServer(t, creds)
+ _, secure, cleanup := runServer(t, creds)
defer cleanup()
var dialFuncInvocationCounter int
@@ -172,7 +173,11 @@ func TestPoolDial(t *testing.T) {
}
}
-func runServer(t *testing.T, creds string) (string, func()) {
+func runServer(t *testing.T, creds string) (*health.Server, string, func()) {
+ return runServerWithAddr(t, creds, "127.0.0.1:0")
+}
+
+func runServerWithAddr(t *testing.T, creds, addr string) (*health.Server, string, func()) {
t.Helper()
var opts []grpc.ServerOption
@@ -195,9 +200,8 @@ func runServer(t *testing.T, creds string) (string, func()) {
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(server, healthServer)
- healthServer.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_SERVING)
- listener, err := net.Listen("tcp", "127.0.0.1:0")
+ listener, err := net.Listen("tcp", addr)
require.NoError(t, err)
errQ := make(chan error)
@@ -205,7 +209,7 @@ func runServer(t *testing.T, creds string) (string, func()) {
errQ <- server.Serve(listener)
}()
- return "tcp://" + listener.Addr().String(), func() {
+ return healthServer, "tcp://" + listener.Addr().String(), func() {
server.Stop()
require.NoError(t, <-errQ)
}
@@ -217,9 +221,7 @@ func verifyConnection(t *testing.T, conn *grpc.ClientConn, expectedCode codes.Co
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
- _, err := grpc_health_v1.NewHealthClient(conn).Check(ctx, &grpc_health_v1.HealthCheckRequest{
- Service: "TestService",
- })
+ _, err := grpc_health_v1.NewHealthClient(conn).Check(ctx, &grpc_health_v1.HealthCheckRequest{})
if expectedCode == codes.OK {
require.NoError(t, err)
@@ -227,3 +229,34 @@ func verifyConnection(t *testing.T, conn *grpc.ClientConn, expectedCode codes.Co
require.Equal(t, expectedCode, status.Code(err))
}
}
+
+func TestPool_Dial_same_addr_another_token(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ _, addr, stop1 := runServer(t, "")
+ defer func() { stop1() }()
+
+ pool := NewPool()
+ defer pool.Close()
+
+ // all good - server is running and serving requests
+ conn, err := pool.Dial(ctx, addr, "")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.OK)
+
+ stop1() // stop the server and all open connections
+ stop1 = func() {}
+
+ cfg, err := starter.ParseEndpoint(addr)
+ require.NoError(t, err)
+
+ // start server on the same address (simulation of service restart) but with token verification enabled
+ _, _, stop2 := runServerWithAddr(t, "token", cfg.Addr)
+ defer stop2()
+
+ // all good - another server with token verification is running on the same address and new connection was established
+ conn, err = pool.Dial(ctx, addr, "token")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.OK)
+}