Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-07-07 09:36:41 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-07-07 09:36:41 +0300
commitddd0d96b21524689e205c11aeca408ffc71e7721 (patch)
tree751d6bc72b447016a14f762bd2e3b38ba9a21aa0
parent733498cddcff02dae73ab06e4763d60c297d8292 (diff)
parentdf8c239817debc400f153c13f494104befe4e81d (diff)
Merge branch 'pks-connection-pool' into 'master'
Connection pool Closes #2933 See merge request gitlab-org/gitaly!2345
-rw-r--r--internal/connection/pool.go86
-rw-r--r--internal/connection/pool_test.go195
-rw-r--r--internal/helper/storage.go18
-rw-r--r--internal/praefect/metadata/server.go20
-rw-r--r--internal/service/hook/pre_receive.go25
-rw-r--r--internal/service/hook/server.go17
-rw-r--r--internal/service/repository/clone_from_pool_internal.go2
-rw-r--r--internal/service/repository/replicate.go58
-rw-r--r--internal/service/repository/server.go13
-rw-r--r--internal/service/repository/server_test.go45
10 files changed, 311 insertions, 168 deletions
diff --git a/internal/connection/pool.go b/internal/connection/pool.go
new file mode 100644
index 000000000..7a8575e68
--- /dev/null
+++ b/internal/connection/pool.go
@@ -0,0 +1,86 @@
+package connection
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "google.golang.org/grpc"
+)
+
+// Pool is a pool of GRPC connections. Connections created by it are safe for
+// concurrent use.
+type Pool struct {
+ lock sync.RWMutex
+ connsByAddress map[string]*grpc.ClientConn
+}
+
+// NewPool creates a new connection pool that's ready for use.
+func NewPool() *Pool {
+ return &Pool{
+ connsByAddress: make(map[string]*grpc.ClientConn),
+ }
+}
+
+// Close closes all connections tracked by the connection pool.
+func (p *Pool) Close() error {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ var firstError error
+ for addr, conn := range p.connsByAddress {
+ if err := conn.Close(); err != nil && firstError == nil {
+ firstError = err
+ }
+
+ delete(p.connsByAddress, addr)
+ }
+
+ return firstError
+}
+
+// Dial creates a new client connection in case no connection to the given
+// address exists already or returns an already established connection. The
+// returned address must not be `Close()`d.
+func (p *Pool) Dial(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
+ return p.getOrCreateConnection(ctx, address, token)
+}
+
+func (p *Pool) getOrCreateConnection(ctx context.Context, address, token string) (*grpc.ClientConn, error) {
+ if address == "" {
+ return nil, errors.New("address is empty")
+ }
+
+ p.lock.RLock()
+ cc, ok := p.connsByAddress[address]
+ p.lock.RUnlock()
+
+ if ok {
+ return cc, nil
+ }
+
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ cc, ok = p.connsByAddress[address]
+ if ok {
+ return cc, nil
+ }
+
+ var opts []grpc.DialOption
+ if token != "" {
+ opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
+ }
+
+ cc, err := client.DialContext(ctx, address, opts)
+ if err != nil {
+ return nil, fmt.Errorf("could not dial source: %v", err)
+ }
+
+ p.connsByAddress[address] = cc
+
+ return cc, nil
+}
diff --git a/internal/connection/pool_test.go b/internal/connection/pool_test.go
new file mode 100644
index 000000000..4da9a0a23
--- /dev/null
+++ b/internal/connection/pool_test.go
@@ -0,0 +1,195 @@
+package connection
+
+import (
+ "context"
+ "net"
+ "sync"
+ "testing"
+ "time"
+
+ grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ "github.com/stretchr/testify/require"
+ gitaly_auth "gitlab.com/gitlab-org/gitaly/internal/config/auth"
+ "gitlab.com/gitlab-org/gitaly/internal/server/auth"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/status"
+)
+
+func TestPoolDial(t *testing.T) {
+ insecure, cleanup := runServer(t, "")
+ defer cleanup()
+
+ creds := "my-little-secret"
+ secure, cleanup := runServer(t, creds)
+ defer cleanup()
+
+ testCases := []struct {
+ desc string
+ test func(t *testing.T, ctx context.Context, pool *Pool)
+ }{
+ {
+ desc: "dialing once succeeds",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ conn, err := pool.Dial(ctx, insecure, "")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.OK)
+ },
+ },
+ {
+ desc: "dialing multiple times succeeds",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ for i := 0; i < 10; i++ {
+ conn, err := pool.Dial(ctx, insecure, "")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.OK)
+ }
+ },
+ },
+ {
+ desc: "redialing after close succeeds",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ conn, err := pool.Dial(ctx, insecure, "")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.OK)
+
+ require.NoError(t, pool.Close())
+
+ conn, err = pool.Dial(ctx, insecure, "")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.OK)
+ },
+ },
+ {
+ desc: "dialing invalid fails",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ conn, err := pool.Dial(ctx, "foo/bar", "")
+ require.Error(t, err)
+ require.Nil(t, conn)
+ },
+ },
+ {
+ desc: "dialing empty fails",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ conn, err := pool.Dial(ctx, "", "")
+ require.Error(t, err)
+ require.Nil(t, conn)
+ },
+ },
+ {
+ desc: "dialing concurrently succeeds",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ wg := sync.WaitGroup{}
+
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+
+ go func() {
+ defer wg.Done()
+ conn, err := pool.Dial(ctx, insecure, "")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.OK)
+ }()
+ }
+
+ wg.Wait()
+ },
+ },
+ {
+ desc: "dialing with credentials succeeds",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ conn, err := pool.Dial(ctx, secure, creds)
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.OK)
+ },
+ },
+ {
+ desc: "dialing with invalid credentials fails",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ conn, err := pool.Dial(ctx, secure, "invalid-credential")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.PermissionDenied)
+ },
+ },
+ {
+ desc: "dialing with missing credentials fails",
+ test: func(t *testing.T, ctx context.Context, pool *Pool) {
+ conn, err := pool.Dial(ctx, secure, "")
+ require.NoError(t, err)
+ verifyConnection(t, conn, codes.Unauthenticated)
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ pool := NewPool()
+ defer func() {
+ require.NoError(t, pool.Close())
+ }()
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ tc.test(t, ctx, pool)
+ })
+ }
+}
+
+func runServer(t *testing.T, creds string) (string, func()) {
+ t.Helper()
+
+ var opts []grpc.ServerOption
+ if creds != "" {
+ opts = []grpc.ServerOption{
+ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
+ auth.StreamServerInterceptor(gitaly_auth.Config{
+ Token: creds,
+ }),
+ )),
+ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
+ auth.UnaryServerInterceptor(gitaly_auth.Config{
+ Token: creds,
+ }),
+ )),
+ }
+ }
+
+ server := grpc.NewServer(opts...)
+
+ healthServer := health.NewServer()
+ grpc_health_v1.RegisterHealthServer(server, healthServer)
+ healthServer.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_SERVING)
+
+ listener, err := net.Listen("tcp", "127.0.0.1:0")
+ require.NoError(t, err)
+
+ errQ := make(chan error)
+ go func() {
+ errQ <- server.Serve(listener)
+ }()
+
+ return "tcp://" + listener.Addr().String(), func() {
+ server.Stop()
+ require.NoError(t, <-errQ)
+ }
+}
+
+func verifyConnection(t *testing.T, conn *grpc.ClientConn, expectedCode codes.Code) {
+ t.Helper()
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ _, err := grpc_health_v1.NewHealthClient(conn).Check(ctx, &grpc_health_v1.HealthCheckRequest{
+ Service: "TestService",
+ })
+
+ if expectedCode == codes.OK {
+ require.NoError(t, err)
+ } else {
+ require.Equal(t, expectedCode, status.Code(err))
+ }
+}
diff --git a/internal/helper/storage.go b/internal/helper/storage.go
index 542e62d69..523ca7ed6 100644
--- a/internal/helper/storage.go
+++ b/internal/helper/storage.go
@@ -7,10 +7,7 @@ import (
"errors"
"fmt"
- gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
- "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/storage"
- "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@@ -79,18 +76,3 @@ func InjectGitalyServers(ctx context.Context, name, address, token string) (cont
return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))), nil
}
-
-// DialServer creates a client connection for a gitaly server
-func DialServer(gitalyServer map[string]string) (*grpc.ClientConn, error) {
- connOpts := []grpc.DialOption{
- grpc.WithInsecure(),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(gitalyServer["token"])),
- }
-
- conn, err := client.Dial(gitalyServer["address"], connOpts)
- if err != nil {
- return nil, fmt.Errorf("could not dial source: %v", err)
- }
-
- return conn, nil
-}
diff --git a/internal/praefect/metadata/server.go b/internal/praefect/metadata/server.go
index 1a19a1e6f..75d98d9f5 100644
--- a/internal/praefect/metadata/server.go
+++ b/internal/praefect/metadata/server.go
@@ -10,11 +10,8 @@ import (
"net/url"
"strings"
- gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
- "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -251,20 +248,3 @@ func (p *PraefectServer) Address() (string, error) {
return "", errors.New("no address configured")
}
-
-// Dial will try to connect to the given Praefect server
-func (p *PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) {
- opts := []grpc.DialOption{
- grpc.WithBlock(),
- }
- if p.Token != "" {
- opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.Token)))
- }
-
- address, err := p.Address()
- if err != nil {
- return nil, err
- }
-
- return client.DialContext(ctx, address, opts)
-}
diff --git a/internal/service/hook/pre_receive.go b/internal/service/hook/pre_receive.go
index c5053661c..32d0867ff 100644
--- a/internal/service/hook/pre_receive.go
+++ b/internal/service/hook/pre_receive.go
@@ -68,30 +68,7 @@ func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectS
if err != nil {
return nil, err
}
-
- s.mutex.RLock()
- conn, ok := s.praefectConnPool[address]
- s.mutex.RUnlock()
-
- if ok {
- return conn, nil
- }
-
- s.mutex.Lock()
- defer s.mutex.Unlock()
-
- conn, ok = s.praefectConnPool[address]
- if !ok {
- var err error
- conn, err = server.Dial(ctx)
- if err != nil {
- return nil, err
- }
-
- s.praefectConnPool[address] = conn
- }
-
- return conn, nil
+ return s.conns.Dial(ctx, address, server.Token)
}
func (s *server) voteOnTransaction(stream gitalypb.HookService_PreReceiveHookServer, hash []byte, env []string) error {
diff --git a/internal/service/hook/server.go b/internal/service/hook/server.go
index ed1171861..a95166bd9 100644
--- a/internal/service/hook/server.go
+++ b/internal/service/hook/server.go
@@ -1,25 +1,22 @@
package hook
import (
- "sync"
-
"gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/connection"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
)
type server struct {
- mutex sync.RWMutex
- praefectConnPool map[string]*grpc.ClientConn
- hooksConfig config.Hooks
- gitlabAPI GitlabAPI
+ conns *connection.Pool
+ hooksConfig config.Hooks
+ gitlabAPI GitlabAPI
}
// NewServer creates a new instance of a gRPC namespace server
func NewServer(gitlab GitlabAPI, hooksConfig config.Hooks) gitalypb.HookServiceServer {
return &server{
- gitlabAPI: gitlab,
- hooksConfig: hooksConfig,
- praefectConnPool: make(map[string]*grpc.ClientConn),
+ gitlabAPI: gitlab,
+ hooksConfig: hooksConfig,
+ conns: connection.NewPool(),
}
}
diff --git a/internal/service/repository/clone_from_pool_internal.go b/internal/service/repository/clone_from_pool_internal.go
index 169357541..2e5d99779 100644
--- a/internal/service/repository/clone_from_pool_internal.go
+++ b/internal/service/repository/clone_from_pool_internal.go
@@ -26,7 +26,7 @@ func (s *server) CloneFromPoolInternal(ctx context.Context, req *gitalypb.CloneF
return nil, helper.ErrInternal(err)
}
- client, err := s.newRemoteClient()
+ client, err := s.newRemoteClient(ctx)
if err != nil {
return nil, helper.ErrInternalf("getting remote service client: %v", err)
}
diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go
index ba15ebdeb..75a7508fc 100644
--- a/internal/service/repository/replicate.go
+++ b/internal/service/repository/replicate.go
@@ -10,8 +10,6 @@ import (
"path/filepath"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
- gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
- "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/command"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/safe"
@@ -20,7 +18,6 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/streamio"
"golang.org/x/sync/errgroup"
- "google.golang.org/grpc"
)
func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) {
@@ -154,7 +151,7 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR
}
func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
- remoteClient, err := s.newRemoteClient()
+ remoteClient, err := s.newRemoteClient(ctx)
if err != nil {
return err
}
@@ -224,67 +221,26 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR
}
// newRemoteClient creates a new RemoteClient that talks to the same gitaly server
-func (s *server) newRemoteClient() (gitalypb.RemoteServiceClient, error) {
- cc, err := s.getOrCreateConnection(fmt.Sprintf("unix:%s", s.internalGitalySocket), "")
+func (s *server) newRemoteClient(ctx context.Context) (gitalypb.RemoteServiceClient, error) {
+ conn, err := s.conns.Dial(ctx, fmt.Sprintf("unix:%s", s.internalGitalySocket), "")
if err != nil {
return nil, err
}
- return gitalypb.NewRemoteServiceClient(cc), nil
+ return gitalypb.NewRemoteServiceClient(conn), nil
}
// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository
func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) {
- conn, err := s.getConnectionByStorage(ctx, storageName)
- if err != nil {
- return nil, err
- }
-
- return gitalypb.NewRepositoryServiceClient(conn), nil
-}
-
-func (s *server) getConnectionByStorage(ctx context.Context, storageName string) (*grpc.ClientConn, error) {
gitalyServerInfo, err := helper.ExtractGitalyServer(ctx, storageName)
if err != nil {
return nil, err
}
- return s.getOrCreateConnection(gitalyServerInfo["address"], gitalyServerInfo["token"])
-}
-
-func (s *server) getOrCreateConnection(address, token string) (*grpc.ClientConn, error) {
- if address == "" {
- return nil, errors.New("address is empty")
- }
-
- s.connsMtx.RLock()
- cc, ok := s.connsByAddress[address]
- s.connsMtx.RUnlock()
-
- if ok {
- return cc, nil
- }
-
- s.connsMtx.Lock()
- defer s.connsMtx.Unlock()
-
- connOpts := []grpc.DialOption{grpc.WithInsecure()}
-
- if token != "" {
- connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
- }
-
- cc, ok = s.connsByAddress[address]
- if ok {
- return cc, nil
- }
-
- cc, err := client.Dial(address, connOpts)
+ conn, err := s.conns.Dial(ctx, gitalyServerInfo["address"], gitalyServerInfo["token"])
if err != nil {
- return nil, fmt.Errorf("could not dial source: %v", err)
+ return nil, err
}
- s.connsByAddress[address] = cc
-
- return cc, nil
+ return gitalypb.NewRepositoryServiceClient(conn), nil
}
diff --git a/internal/service/repository/server.go b/internal/service/repository/server.go
index 4a40cc28f..f6523ceea 100644
--- a/internal/service/repository/server.go
+++ b/internal/service/repository/server.go
@@ -2,27 +2,30 @@ package repository
import (
"context"
- "sync"
+ "gitlab.com/gitlab-org/gitaly/internal/connection"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/storage"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
)
type server struct {
ruby *rubyserver.Server
gitalypb.UnimplementedRepositoryServiceServer
- connsByAddress map[string]*grpc.ClientConn
- connsMtx sync.RWMutex
+ conns *connection.Pool
internalGitalySocket string
locator storage.Locator
}
// NewServer creates a new instance of a gRPC repo server
func NewServer(rs *rubyserver.Server, locator storage.Locator, internalGitalySocket string) gitalypb.RepositoryServiceServer {
- return &server{ruby: rs, locator: locator, connsByAddress: make(map[string]*grpc.ClientConn), internalGitalySocket: internalGitalySocket}
+ return &server{
+ ruby: rs,
+ locator: locator,
+ conns: connection.NewPool(),
+ internalGitalySocket: internalGitalySocket,
+ }
}
func (*server) FetchHTTPRemote(context.Context, *gitalypb.FetchHTTPRemoteRequest) (*gitalypb.FetchHTTPRemoteResponse, error) {
diff --git a/internal/service/repository/server_test.go b/internal/service/repository/server_test.go
index 6ef9b0939..434db0a7d 100644
--- a/internal/service/repository/server_test.go
+++ b/internal/service/repository/server_test.go
@@ -1,19 +1,20 @@
package repository
import (
- "sync"
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/connection"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func TestGetConnectionByStorage(t *testing.T) {
- s := server{connsByAddress: make(map[string]*grpc.ClientConn)}
+ connPool := connection.NewPool()
+ defer connPool.Close()
+
+ s := server{conns: connPool}
ctx, cancel := testhelper.Context()
defer cancel()
@@ -27,40 +28,6 @@ func TestGetConnectionByStorage(t *testing.T) {
incomingCtx := metadata.NewIncomingContext(ctx, md)
- cc, err := s.getConnectionByStorage(incomingCtx, storageName)
- require.NoError(t, err)
-
- cc1, err := s.getConnectionByStorage(incomingCtx, storageName)
+ _, err = s.newRepoClient(incomingCtx, storageName)
require.NoError(t, err)
- require.True(t, cc == cc1, "cc1 should be the cached copy")
-}
-
-func TestGetConnectionsConcurrentAccess(t *testing.T) {
- s := server{connsByAddress: make(map[string]*grpc.ClientConn)}
-
- address := "unix://fake/address/wont/work"
-
- var remoteClient gitalypb.RemoteServiceClient
- var cc *grpc.ClientConn
-
- var wg sync.WaitGroup
- wg.Add(2)
-
- go func() {
- var err error
- cc, err = s.getOrCreateConnection(address, "")
- require.NoError(t, err)
- wg.Done()
- }()
-
- go func() {
- var err error
- remoteClient, err = s.newRemoteClient()
- require.NoError(t, err)
- wg.Done()
- }()
-
- wg.Wait()
- require.NotNil(t, cc)
- require.NotNil(t, remoteClient)
}