diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-07-07 09:36:41 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-07-07 09:36:41 +0300 |
commit | ddd0d96b21524689e205c11aeca408ffc71e7721 (patch) | |
tree | 751d6bc72b447016a14f762bd2e3b38ba9a21aa0 | |
parent | 733498cddcff02dae73ab06e4763d60c297d8292 (diff) | |
parent | df8c239817debc400f153c13f494104befe4e81d (diff) |
Merge branch 'pks-connection-pool' into 'master'
Connection pool
Closes #2933
See merge request gitlab-org/gitaly!2345
-rw-r--r-- | internal/connection/pool.go | 86 | ||||
-rw-r--r-- | internal/connection/pool_test.go | 195 | ||||
-rw-r--r-- | internal/helper/storage.go | 18 | ||||
-rw-r--r-- | internal/praefect/metadata/server.go | 20 | ||||
-rw-r--r-- | internal/service/hook/pre_receive.go | 25 | ||||
-rw-r--r-- | internal/service/hook/server.go | 17 | ||||
-rw-r--r-- | internal/service/repository/clone_from_pool_internal.go | 2 | ||||
-rw-r--r-- | internal/service/repository/replicate.go | 58 | ||||
-rw-r--r-- | internal/service/repository/server.go | 13 | ||||
-rw-r--r-- | internal/service/repository/server_test.go | 45 |
10 files changed, 311 insertions, 168 deletions
diff --git a/internal/connection/pool.go b/internal/connection/pool.go new file mode 100644 index 000000000..7a8575e68 --- /dev/null +++ b/internal/connection/pool.go @@ -0,0 +1,86 @@ +package connection + +import ( + "context" + "errors" + "fmt" + "sync" + + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" + "google.golang.org/grpc" +) + +// Pool is a pool of GRPC connections. Connections created by it are safe for +// concurrent use. +type Pool struct { + lock sync.RWMutex + connsByAddress map[string]*grpc.ClientConn +} + +// NewPool creates a new connection pool that's ready for use. +func NewPool() *Pool { + return &Pool{ + connsByAddress: make(map[string]*grpc.ClientConn), + } +} + +// Close closes all connections tracked by the connection pool. +func (p *Pool) Close() error { + p.lock.Lock() + defer p.lock.Unlock() + + var firstError error + for addr, conn := range p.connsByAddress { + if err := conn.Close(); err != nil && firstError == nil { + firstError = err + } + + delete(p.connsByAddress, addr) + } + + return firstError +} + +// Dial creates a new client connection in case no connection to the given +// address exists already or returns an already established connection. The +// returned address must not be `Close()`d. +func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) { + return p.getOrCreateConnection(ctx, address, token) +} + +func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) { + if address == "" { + return nil, errors.New("address is empty") + } + + p.lock.RLock() + cc, ok := p.connsByAddress[address] + p.lock.RUnlock() + + if ok { + return cc, nil + } + + p.lock.Lock() + defer p.lock.Unlock() + + cc, ok = p.connsByAddress[address] + if ok { + return cc, nil + } + + var opts []grpc.DialOption + if token != "" { + opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) + } + + cc, err := client.DialContext(ctx, address, opts) + if err != nil { + return nil, fmt.Errorf("could not dial source: %v", err) + } + + p.connsByAddress[address] = cc + + return cc, nil +} diff --git a/internal/connection/pool_test.go b/internal/connection/pool_test.go new file mode 100644 index 000000000..4da9a0a23 --- /dev/null +++ b/internal/connection/pool_test.go @@ -0,0 +1,195 @@ +package connection + +import ( + "context" + "net" + "sync" + "testing" + "time" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/stretchr/testify/require" + gitaly_auth "gitlab.com/gitlab-org/gitaly/internal/config/auth" + "gitlab.com/gitlab-org/gitaly/internal/server/auth" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +func TestPoolDial(t *testing.T) { + insecure, cleanup := runServer(t, "") + defer cleanup() + + creds := "my-little-secret" + secure, cleanup := runServer(t, creds) + defer cleanup() + + testCases := []struct { + desc string + test func(t *testing.T, ctx context.Context, pool *Pool) + }{ + { + desc: "dialing once succeeds", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + conn, err := pool.Dial(ctx, insecure, "") + require.NoError(t, err) + verifyConnection(t, conn, codes.OK) + }, + }, + { + desc: "dialing multiple times succeeds", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + for i := 0; i < 10; i++ { + conn, err := pool.Dial(ctx, insecure, "") + require.NoError(t, err) + verifyConnection(t, conn, codes.OK) + } + }, + }, + { + desc: "redialing after close succeeds", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + conn, err := pool.Dial(ctx, insecure, "") + require.NoError(t, err) + verifyConnection(t, conn, codes.OK) + + require.NoError(t, pool.Close()) + + conn, err = pool.Dial(ctx, insecure, "") + require.NoError(t, err) + verifyConnection(t, conn, codes.OK) + }, + }, + { + desc: "dialing invalid fails", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + conn, err := pool.Dial(ctx, "foo/bar", "") + require.Error(t, err) + require.Nil(t, conn) + }, + }, + { + desc: "dialing empty fails", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + conn, err := pool.Dial(ctx, "", "") + require.Error(t, err) + require.Nil(t, conn) + }, + }, + { + desc: "dialing concurrently succeeds", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + wg := sync.WaitGroup{} + + for i := 0; i < 10; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + conn, err := pool.Dial(ctx, insecure, "") + require.NoError(t, err) + verifyConnection(t, conn, codes.OK) + }() + } + + wg.Wait() + }, + }, + { + desc: "dialing with credentials succeeds", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + conn, err := pool.Dial(ctx, secure, creds) + require.NoError(t, err) + verifyConnection(t, conn, codes.OK) + }, + }, + { + desc: "dialing with invalid credentials fails", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + conn, err := pool.Dial(ctx, secure, "invalid-credential") + require.NoError(t, err) + verifyConnection(t, conn, codes.PermissionDenied) + }, + }, + { + desc: "dialing with missing credentials fails", + test: func(t *testing.T, ctx context.Context, pool *Pool) { + conn, err := pool.Dial(ctx, secure, "") + require.NoError(t, err) + verifyConnection(t, conn, codes.Unauthenticated) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + pool := NewPool() + defer func() { + require.NoError(t, pool.Close()) + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + tc.test(t, ctx, pool) + }) + } +} + +func runServer(t *testing.T, creds string) (string, func()) { + t.Helper() + + var opts []grpc.ServerOption + if creds != "" { + opts = []grpc.ServerOption{ + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + auth.StreamServerInterceptor(gitaly_auth.Config{ + Token: creds, + }), + )), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + auth.UnaryServerInterceptor(gitaly_auth.Config{ + Token: creds, + }), + )), + } + } + + server := grpc.NewServer(opts...) + + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(server, healthServer) + healthServer.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_SERVING) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + errQ := make(chan error) + go func() { + errQ <- server.Serve(listener) + }() + + return "tcp://" + listener.Addr().String(), func() { + server.Stop() + require.NoError(t, <-errQ) + } +} + +func verifyConnection(t *testing.T, conn *grpc.ClientConn, expectedCode codes.Code) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + _, err := grpc_health_v1.NewHealthClient(conn).Check(ctx, &grpc_health_v1.HealthCheckRequest{ + Service: "TestService", + }) + + if expectedCode == codes.OK { + require.NoError(t, err) + } else { + require.Equal(t, expectedCode, status.Code(err)) + } +} diff --git a/internal/helper/storage.go b/internal/helper/storage.go index 542e62d69..523ca7ed6 100644 --- a/internal/helper/storage.go +++ b/internal/helper/storage.go @@ -7,10 +7,7 @@ import ( "errors" "fmt" - gitalyauth "gitlab.com/gitlab-org/gitaly/auth" - "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/storage" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -79,18 +76,3 @@ func InjectGitalyServers(ctx context.Context, name, address, token string) (cont return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))), nil } - -// DialServer creates a client connection for a gitaly server -func DialServer(gitalyServer map[string]string) (*grpc.ClientConn, error) { - connOpts := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(gitalyServer["token"])), - } - - conn, err := client.Dial(gitalyServer["address"], connOpts) - if err != nil { - return nil, fmt.Errorf("could not dial source: %v", err) - } - - return conn, nil -} diff --git a/internal/praefect/metadata/server.go b/internal/praefect/metadata/server.go index 1a19a1e6f..75d98d9f5 100644 --- a/internal/praefect/metadata/server.go +++ b/internal/praefect/metadata/server.go @@ -10,11 +10,8 @@ import ( "net/url" "strings" - gitalyauth "gitlab.com/gitlab-org/gitaly/auth" - "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" @@ -251,20 +248,3 @@ func (p *PraefectServer) Address() (string, error) { return "", errors.New("no address configured") } - -// Dial will try to connect to the given Praefect server -func (p *PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) { - opts := []grpc.DialOption{ - grpc.WithBlock(), - } - if p.Token != "" { - opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.Token))) - } - - address, err := p.Address() - if err != nil { - return nil, err - } - - return client.DialContext(ctx, address, opts) -} diff --git a/internal/service/hook/pre_receive.go b/internal/service/hook/pre_receive.go index c5053661c..32d0867ff 100644 --- a/internal/service/hook/pre_receive.go +++ b/internal/service/hook/pre_receive.go @@ -68,30 +68,7 @@ func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectS if err != nil { return nil, err } - - s.mutex.RLock() - conn, ok := s.praefectConnPool[address] - s.mutex.RUnlock() - - if ok { - return conn, nil - } - - s.mutex.Lock() - defer s.mutex.Unlock() - - conn, ok = s.praefectConnPool[address] - if !ok { - var err error - conn, err = server.Dial(ctx) - if err != nil { - return nil, err - } - - s.praefectConnPool[address] = conn - } - - return conn, nil + return s.conns.Dial(ctx, address, server.Token) } func (s *server) voteOnTransaction(stream gitalypb.HookService_PreReceiveHookServer, hash []byte, env []string) error { diff --git a/internal/service/hook/server.go b/internal/service/hook/server.go index ed1171861..a95166bd9 100644 --- a/internal/service/hook/server.go +++ b/internal/service/hook/server.go @@ -1,25 +1,22 @@ package hook import ( - "sync" - "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/connection" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" ) type server struct { - mutex sync.RWMutex - praefectConnPool map[string]*grpc.ClientConn - hooksConfig config.Hooks - gitlabAPI GitlabAPI + conns *connection.Pool + hooksConfig config.Hooks + gitlabAPI GitlabAPI } // NewServer creates a new instance of a gRPC namespace server func NewServer(gitlab GitlabAPI, hooksConfig config.Hooks) gitalypb.HookServiceServer { return &server{ - gitlabAPI: gitlab, - hooksConfig: hooksConfig, - praefectConnPool: make(map[string]*grpc.ClientConn), + gitlabAPI: gitlab, + hooksConfig: hooksConfig, + conns: connection.NewPool(), } } diff --git a/internal/service/repository/clone_from_pool_internal.go b/internal/service/repository/clone_from_pool_internal.go index 169357541..2e5d99779 100644 --- a/internal/service/repository/clone_from_pool_internal.go +++ b/internal/service/repository/clone_from_pool_internal.go @@ -26,7 +26,7 @@ func (s *server) CloneFromPoolInternal(ctx context.Context, req *gitalypb.CloneF return nil, helper.ErrInternal(err) } - client, err := s.newRemoteClient() + client, err := s.newRemoteClient(ctx) if err != nil { return nil, helper.ErrInternalf("getting remote service client: %v", err) } diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go index ba15ebdeb..75a7508fc 100644 --- a/internal/service/repository/replicate.go +++ b/internal/service/repository/replicate.go @@ -10,8 +10,6 @@ import ( "path/filepath" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" - gitalyauth "gitlab.com/gitlab-org/gitaly/auth" - "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/command" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/safe" @@ -20,7 +18,6 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" ) func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) { @@ -154,7 +151,7 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR } func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { - remoteClient, err := s.newRemoteClient() + remoteClient, err := s.newRemoteClient(ctx) if err != nil { return err } @@ -224,67 +221,26 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR } // newRemoteClient creates a new RemoteClient that talks to the same gitaly server -func (s *server) newRemoteClient() (gitalypb.RemoteServiceClient, error) { - cc, err := s.getOrCreateConnection(fmt.Sprintf("unix:%s", s.internalGitalySocket), "") +func (s *server) newRemoteClient(ctx context.Context) (gitalypb.RemoteServiceClient, error) { + conn, err := s.conns.Dial(ctx, fmt.Sprintf("unix:%s", s.internalGitalySocket), "") if err != nil { return nil, err } - return gitalypb.NewRemoteServiceClient(cc), nil + return gitalypb.NewRemoteServiceClient(conn), nil } // newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) { - conn, err := s.getConnectionByStorage(ctx, storageName) - if err != nil { - return nil, err - } - - return gitalypb.NewRepositoryServiceClient(conn), nil -} - -func (s *server) getConnectionByStorage(ctx context.Context, storageName string) (*grpc.ClientConn, error) { gitalyServerInfo, err := helper.ExtractGitalyServer(ctx, storageName) if err != nil { return nil, err } - return s.getOrCreateConnection(gitalyServerInfo["address"], gitalyServerInfo["token"]) -} - -func (s *server) getOrCreateConnection(address, token string) (*grpc.ClientConn, error) { - if address == "" { - return nil, errors.New("address is empty") - } - - s.connsMtx.RLock() - cc, ok := s.connsByAddress[address] - s.connsMtx.RUnlock() - - if ok { - return cc, nil - } - - s.connsMtx.Lock() - defer s.connsMtx.Unlock() - - connOpts := []grpc.DialOption{grpc.WithInsecure()} - - if token != "" { - connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) - } - - cc, ok = s.connsByAddress[address] - if ok { - return cc, nil - } - - cc, err := client.Dial(address, connOpts) + conn, err := s.conns.Dial(ctx, gitalyServerInfo["address"], gitalyServerInfo["token"]) if err != nil { - return nil, fmt.Errorf("could not dial source: %v", err) + return nil, err } - s.connsByAddress[address] = cc - - return cc, nil + return gitalypb.NewRepositoryServiceClient(conn), nil } diff --git a/internal/service/repository/server.go b/internal/service/repository/server.go index 4a40cc28f..f6523ceea 100644 --- a/internal/service/repository/server.go +++ b/internal/service/repository/server.go @@ -2,27 +2,30 @@ package repository import ( "context" - "sync" + "gitlab.com/gitlab-org/gitaly/internal/connection" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" ) type server struct { ruby *rubyserver.Server gitalypb.UnimplementedRepositoryServiceServer - connsByAddress map[string]*grpc.ClientConn - connsMtx sync.RWMutex + conns *connection.Pool internalGitalySocket string locator storage.Locator } // NewServer creates a new instance of a gRPC repo server func NewServer(rs *rubyserver.Server, locator storage.Locator, internalGitalySocket string) gitalypb.RepositoryServiceServer { - return &server{ruby: rs, locator: locator, connsByAddress: make(map[string]*grpc.ClientConn), internalGitalySocket: internalGitalySocket} + return &server{ + ruby: rs, + locator: locator, + conns: connection.NewPool(), + internalGitalySocket: internalGitalySocket, + } } func (*server) FetchHTTPRemote(context.Context, *gitalypb.FetchHTTPRemoteRequest) (*gitalypb.FetchHTTPRemoteResponse, error) { diff --git a/internal/service/repository/server_test.go b/internal/service/repository/server_test.go index 6ef9b0939..434db0a7d 100644 --- a/internal/service/repository/server_test.go +++ b/internal/service/repository/server_test.go @@ -1,19 +1,20 @@ package repository import ( - "sync" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/connection" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) func TestGetConnectionByStorage(t *testing.T) { - s := server{connsByAddress: make(map[string]*grpc.ClientConn)} + connPool := connection.NewPool() + defer connPool.Close() + + s := server{conns: connPool} ctx, cancel := testhelper.Context() defer cancel() @@ -27,40 +28,6 @@ func TestGetConnectionByStorage(t *testing.T) { incomingCtx := metadata.NewIncomingContext(ctx, md) - cc, err := s.getConnectionByStorage(incomingCtx, storageName) - require.NoError(t, err) - - cc1, err := s.getConnectionByStorage(incomingCtx, storageName) + _, err = s.newRepoClient(incomingCtx, storageName) require.NoError(t, err) - require.True(t, cc == cc1, "cc1 should be the cached copy") -} - -func TestGetConnectionsConcurrentAccess(t *testing.T) { - s := server{connsByAddress: make(map[string]*grpc.ClientConn)} - - address := "unix://fake/address/wont/work" - - var remoteClient gitalypb.RemoteServiceClient - var cc *grpc.ClientConn - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - var err error - cc, err = s.getOrCreateConnection(address, "") - require.NoError(t, err) - wg.Done() - }() - - go func() { - var err error - remoteClient, err = s.newRemoteClient() - require.NoError(t, err) - wg.Done() - }() - - wg.Wait() - require.NotNil(t, cc) - require.NotNil(t, remoteClient) } |