diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-07-03 10:01:19 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-07-06 09:23:05 +0300 |
commit | 84e2e8eee975547cb24871215313a955b0c8f8ee (patch) | |
tree | a31af4f60e94af4e926fed52a8649bf1e2607a83 | |
parent | 1f267efcf53c05582922b4b4ce83318456c0b181 (diff) |
repository: Use new connection pool interface
Now that we've got a connection pooling interface available that manages
GRPC connections for us, let's use that one instead of custom-coding our
own.
-rw-r--r-- | internal/service/repository/clone_from_pool_internal.go | 2 | ||||
-rw-r--r-- | internal/service/repository/replicate.go | 58 | ||||
-rw-r--r-- | internal/service/repository/server.go | 13 | ||||
-rw-r--r-- | internal/service/repository/server_test.go | 45 |
4 files changed, 22 insertions, 96 deletions
diff --git a/internal/service/repository/clone_from_pool_internal.go b/internal/service/repository/clone_from_pool_internal.go index 169357541..2e5d99779 100644 --- a/internal/service/repository/clone_from_pool_internal.go +++ b/internal/service/repository/clone_from_pool_internal.go @@ -26,7 +26,7 @@ func (s *server) CloneFromPoolInternal(ctx context.Context, req *gitalypb.CloneF return nil, helper.ErrInternal(err) } - client, err := s.newRemoteClient() + client, err := s.newRemoteClient(ctx) if err != nil { return nil, helper.ErrInternalf("getting remote service client: %v", err) } diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go index ba15ebdeb..75a7508fc 100644 --- a/internal/service/repository/replicate.go +++ b/internal/service/repository/replicate.go @@ -10,8 +10,6 @@ import ( "path/filepath" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" - gitalyauth "gitlab.com/gitlab-org/gitaly/auth" - "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/command" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/safe" @@ -20,7 +18,6 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" ) func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) { @@ -154,7 +151,7 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR } func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { - remoteClient, err := s.newRemoteClient() + remoteClient, err := s.newRemoteClient(ctx) if err != nil { return err } @@ -224,67 +221,26 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR } // newRemoteClient creates a new RemoteClient that talks to the same gitaly server -func (s *server) newRemoteClient() (gitalypb.RemoteServiceClient, error) { - cc, err := s.getOrCreateConnection(fmt.Sprintf("unix:%s", s.internalGitalySocket), "") +func (s *server) newRemoteClient(ctx context.Context) (gitalypb.RemoteServiceClient, error) { + conn, err := s.conns.Dial(ctx, fmt.Sprintf("unix:%s", s.internalGitalySocket), "") if err != nil { return nil, err } - return gitalypb.NewRemoteServiceClient(cc), nil + return gitalypb.NewRemoteServiceClient(conn), nil } // newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) { - conn, err := s.getConnectionByStorage(ctx, storageName) - if err != nil { - return nil, err - } - - return gitalypb.NewRepositoryServiceClient(conn), nil -} - -func (s *server) getConnectionByStorage(ctx context.Context, storageName string) (*grpc.ClientConn, error) { gitalyServerInfo, err := helper.ExtractGitalyServer(ctx, storageName) if err != nil { return nil, err } - return s.getOrCreateConnection(gitalyServerInfo["address"], gitalyServerInfo["token"]) -} - -func (s *server) getOrCreateConnection(address, token string) (*grpc.ClientConn, error) { - if address == "" { - return nil, errors.New("address is empty") - } - - s.connsMtx.RLock() - cc, ok := s.connsByAddress[address] - s.connsMtx.RUnlock() - - if ok { - return cc, nil - } - - s.connsMtx.Lock() - defer s.connsMtx.Unlock() - - connOpts := []grpc.DialOption{grpc.WithInsecure()} - - if token != "" { - connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))) - } - - cc, ok = s.connsByAddress[address] - if ok { - return cc, nil - } - - cc, err := client.Dial(address, connOpts) + conn, err := s.conns.Dial(ctx, gitalyServerInfo["address"], gitalyServerInfo["token"]) if err != nil { - return nil, fmt.Errorf("could not dial source: %v", err) + return nil, err } - s.connsByAddress[address] = cc - - return cc, nil + return gitalypb.NewRepositoryServiceClient(conn), nil } diff --git a/internal/service/repository/server.go b/internal/service/repository/server.go index 4a40cc28f..f6523ceea 100644 --- a/internal/service/repository/server.go +++ b/internal/service/repository/server.go @@ -2,27 +2,30 @@ package repository import ( "context" - "sync" + "gitlab.com/gitlab-org/gitaly/internal/connection" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" ) type server struct { ruby *rubyserver.Server gitalypb.UnimplementedRepositoryServiceServer - connsByAddress map[string]*grpc.ClientConn - connsMtx sync.RWMutex + conns *connection.Pool internalGitalySocket string locator storage.Locator } // NewServer creates a new instance of a gRPC repo server func NewServer(rs *rubyserver.Server, locator storage.Locator, internalGitalySocket string) gitalypb.RepositoryServiceServer { - return &server{ruby: rs, locator: locator, connsByAddress: make(map[string]*grpc.ClientConn), internalGitalySocket: internalGitalySocket} + return &server{ + ruby: rs, + locator: locator, + conns: connection.NewPool(), + internalGitalySocket: internalGitalySocket, + } } func (*server) FetchHTTPRemote(context.Context, *gitalypb.FetchHTTPRemoteRequest) (*gitalypb.FetchHTTPRemoteResponse, error) { diff --git a/internal/service/repository/server_test.go b/internal/service/repository/server_test.go index 6ef9b0939..434db0a7d 100644 --- a/internal/service/repository/server_test.go +++ b/internal/service/repository/server_test.go @@ -1,19 +1,20 @@ package repository import ( - "sync" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/connection" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) func TestGetConnectionByStorage(t *testing.T) { - s := server{connsByAddress: make(map[string]*grpc.ClientConn)} + connPool := connection.NewPool() + defer connPool.Close() + + s := server{conns: connPool} ctx, cancel := testhelper.Context() defer cancel() @@ -27,40 +28,6 @@ func TestGetConnectionByStorage(t *testing.T) { incomingCtx := metadata.NewIncomingContext(ctx, md) - cc, err := s.getConnectionByStorage(incomingCtx, storageName) - require.NoError(t, err) - - cc1, err := s.getConnectionByStorage(incomingCtx, storageName) + _, err = s.newRepoClient(incomingCtx, storageName) require.NoError(t, err) - require.True(t, cc == cc1, "cc1 should be the cached copy") -} - -func TestGetConnectionsConcurrentAccess(t *testing.T) { - s := server{connsByAddress: make(map[string]*grpc.ClientConn)} - - address := "unix://fake/address/wont/work" - - var remoteClient gitalypb.RemoteServiceClient - var cc *grpc.ClientConn - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - var err error - cc, err = s.getOrCreateConnection(address, "") - require.NoError(t, err) - wg.Done() - }() - - go func() { - var err error - remoteClient, err = s.newRemoteClient() - require.NoError(t, err) - wg.Done() - }() - - wg.Wait() - require.NotNil(t, cc) - require.NotNil(t, remoteClient) } |