Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-07-03 10:01:19 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-07-06 09:23:05 +0300
commit84e2e8eee975547cb24871215313a955b0c8f8ee (patch)
treea31af4f60e94af4e926fed52a8649bf1e2607a83
parent1f267efcf53c05582922b4b4ce83318456c0b181 (diff)
repository: Use new connection pool interface
Now that we've got a connection pooling interface available that manages GRPC connections for us, let's use that one instead of custom-coding our own.
-rw-r--r--internal/service/repository/clone_from_pool_internal.go2
-rw-r--r--internal/service/repository/replicate.go58
-rw-r--r--internal/service/repository/server.go13
-rw-r--r--internal/service/repository/server_test.go45
4 files changed, 22 insertions, 96 deletions
diff --git a/internal/service/repository/clone_from_pool_internal.go b/internal/service/repository/clone_from_pool_internal.go
index 169357541..2e5d99779 100644
--- a/internal/service/repository/clone_from_pool_internal.go
+++ b/internal/service/repository/clone_from_pool_internal.go
@@ -26,7 +26,7 @@ func (s *server) CloneFromPoolInternal(ctx context.Context, req *gitalypb.CloneF
return nil, helper.ErrInternal(err)
}
- client, err := s.newRemoteClient()
+ client, err := s.newRemoteClient(ctx)
if err != nil {
return nil, helper.ErrInternalf("getting remote service client: %v", err)
}
diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go
index ba15ebdeb..75a7508fc 100644
--- a/internal/service/repository/replicate.go
+++ b/internal/service/repository/replicate.go
@@ -10,8 +10,6 @@ import (
"path/filepath"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
- gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
- "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/command"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/safe"
@@ -20,7 +18,6 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/streamio"
"golang.org/x/sync/errgroup"
- "google.golang.org/grpc"
)
func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) {
@@ -154,7 +151,7 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR
}
func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
- remoteClient, err := s.newRemoteClient()
+ remoteClient, err := s.newRemoteClient(ctx)
if err != nil {
return err
}
@@ -224,67 +221,26 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR
}
// newRemoteClient creates a new RemoteClient that talks to the same gitaly server
-func (s *server) newRemoteClient() (gitalypb.RemoteServiceClient, error) {
- cc, err := s.getOrCreateConnection(fmt.Sprintf("unix:%s", s.internalGitalySocket), "")
+func (s *server) newRemoteClient(ctx context.Context) (gitalypb.RemoteServiceClient, error) {
+ conn, err := s.conns.Dial(ctx, fmt.Sprintf("unix:%s", s.internalGitalySocket), "")
if err != nil {
return nil, err
}
- return gitalypb.NewRemoteServiceClient(cc), nil
+ return gitalypb.NewRemoteServiceClient(conn), nil
}
// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository
func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) {
- conn, err := s.getConnectionByStorage(ctx, storageName)
- if err != nil {
- return nil, err
- }
-
- return gitalypb.NewRepositoryServiceClient(conn), nil
-}
-
-func (s *server) getConnectionByStorage(ctx context.Context, storageName string) (*grpc.ClientConn, error) {
gitalyServerInfo, err := helper.ExtractGitalyServer(ctx, storageName)
if err != nil {
return nil, err
}
- return s.getOrCreateConnection(gitalyServerInfo["address"], gitalyServerInfo["token"])
-}
-
-func (s *server) getOrCreateConnection(address, token string) (*grpc.ClientConn, error) {
- if address == "" {
- return nil, errors.New("address is empty")
- }
-
- s.connsMtx.RLock()
- cc, ok := s.connsByAddress[address]
- s.connsMtx.RUnlock()
-
- if ok {
- return cc, nil
- }
-
- s.connsMtx.Lock()
- defer s.connsMtx.Unlock()
-
- connOpts := []grpc.DialOption{grpc.WithInsecure()}
-
- if token != "" {
- connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)))
- }
-
- cc, ok = s.connsByAddress[address]
- if ok {
- return cc, nil
- }
-
- cc, err := client.Dial(address, connOpts)
+ conn, err := s.conns.Dial(ctx, gitalyServerInfo["address"], gitalyServerInfo["token"])
if err != nil {
- return nil, fmt.Errorf("could not dial source: %v", err)
+ return nil, err
}
- s.connsByAddress[address] = cc
-
- return cc, nil
+ return gitalypb.NewRepositoryServiceClient(conn), nil
}
diff --git a/internal/service/repository/server.go b/internal/service/repository/server.go
index 4a40cc28f..f6523ceea 100644
--- a/internal/service/repository/server.go
+++ b/internal/service/repository/server.go
@@ -2,27 +2,30 @@ package repository
import (
"context"
- "sync"
+ "gitlab.com/gitlab-org/gitaly/internal/connection"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/storage"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
)
type server struct {
ruby *rubyserver.Server
gitalypb.UnimplementedRepositoryServiceServer
- connsByAddress map[string]*grpc.ClientConn
- connsMtx sync.RWMutex
+ conns *connection.Pool
internalGitalySocket string
locator storage.Locator
}
// NewServer creates a new instance of a gRPC repo server
func NewServer(rs *rubyserver.Server, locator storage.Locator, internalGitalySocket string) gitalypb.RepositoryServiceServer {
- return &server{ruby: rs, locator: locator, connsByAddress: make(map[string]*grpc.ClientConn), internalGitalySocket: internalGitalySocket}
+ return &server{
+ ruby: rs,
+ locator: locator,
+ conns: connection.NewPool(),
+ internalGitalySocket: internalGitalySocket,
+ }
}
func (*server) FetchHTTPRemote(context.Context, *gitalypb.FetchHTTPRemoteRequest) (*gitalypb.FetchHTTPRemoteResponse, error) {
diff --git a/internal/service/repository/server_test.go b/internal/service/repository/server_test.go
index 6ef9b0939..434db0a7d 100644
--- a/internal/service/repository/server_test.go
+++ b/internal/service/repository/server_test.go
@@ -1,19 +1,20 @@
package repository
import (
- "sync"
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/connection"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func TestGetConnectionByStorage(t *testing.T) {
- s := server{connsByAddress: make(map[string]*grpc.ClientConn)}
+ connPool := connection.NewPool()
+ defer connPool.Close()
+
+ s := server{conns: connPool}
ctx, cancel := testhelper.Context()
defer cancel()
@@ -27,40 +28,6 @@ func TestGetConnectionByStorage(t *testing.T) {
incomingCtx := metadata.NewIncomingContext(ctx, md)
- cc, err := s.getConnectionByStorage(incomingCtx, storageName)
- require.NoError(t, err)
-
- cc1, err := s.getConnectionByStorage(incomingCtx, storageName)
+ _, err = s.newRepoClient(incomingCtx, storageName)
require.NoError(t, err)
- require.True(t, cc == cc1, "cc1 should be the cached copy")
-}
-
-func TestGetConnectionsConcurrentAccess(t *testing.T) {
- s := server{connsByAddress: make(map[string]*grpc.ClientConn)}
-
- address := "unix://fake/address/wont/work"
-
- var remoteClient gitalypb.RemoteServiceClient
- var cc *grpc.ClientConn
-
- var wg sync.WaitGroup
- wg.Add(2)
-
- go func() {
- var err error
- cc, err = s.getOrCreateConnection(address, "")
- require.NoError(t, err)
- wg.Done()
- }()
-
- go func() {
- var err error
- remoteClient, err = s.newRemoteClient()
- require.NoError(t, err)
- wg.Done()
- }()
-
- wg.Wait()
- require.NotNil(t, cc)
- require.NotNil(t, remoteClient)
}