Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2020-02-05 20:29:09 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2020-02-05 20:29:09 +0300
commit5e915a9ed81ff5ceb0d0a65da80d022eed14ea83 (patch)
tree7b8c6962856fb360c21ca8d66d324f7e67593d93
parent760601790d0d2d382eb603b00e571446456f702e (diff)
parent89272d1821f804110a9753de2ed128cfdbd2bb97 (diff)
Merge branch 'jc-reuse-cc' into 'master'
Cache and reuse client connection in ReplicateRepository Closes #2425 See merge request gitlab-org/gitaly!1801
-rw-r--r--changelogs/unreleased/jc-reuse-cc.yml5
-rw-r--r--internal/helper/storage.go34
-rw-r--r--internal/service/repository/replicate.go77
-rw-r--r--internal/service/repository/server.go6
-rw-r--r--internal/service/repository/server_test.go66
5 files changed, 160 insertions, 28 deletions
diff --git a/changelogs/unreleased/jc-reuse-cc.yml b/changelogs/unreleased/jc-reuse-cc.yml
new file mode 100644
index 000000000..43a9758c4
--- /dev/null
+++ b/changelogs/unreleased/jc-reuse-cc.yml
@@ -0,0 +1,5 @@
+---
+title: Cache and reuse client connection in ReplicateRepository
+merge_request: 1801
+author:
+type: fixed
diff --git a/internal/helper/storage.go b/internal/helper/storage.go
index 133a8a11d..6efa280e3 100644
--- a/internal/helper/storage.go
+++ b/internal/helper/storage.go
@@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
+ "errors"
"fmt"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
@@ -37,6 +38,21 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly
return
}
+// ExtractGitalyServer extracts server information for a specific storage
+func ExtractGitalyServer(ctx context.Context, storageName string) (map[string]string, error) {
+ gitalyServers, err := ExtractGitalyServers(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ gitalyServer, ok := gitalyServers[storageName]
+ if !ok {
+ return nil, errors.New("storage name not found")
+ }
+
+ return gitalyServer, nil
+}
+
// IncomingToOutgoing creates an outgoing context out of an incoming context with the same storage metadata
func IncomingToOutgoing(ctx context.Context) context.Context {
md, ok := metadata.FromIncomingContext(ctx)
@@ -64,24 +80,14 @@ func InjectGitalyServers(ctx context.Context, name, address, token string) (cont
return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))), nil
}
-// ClientConnection creates a grpc.ClientConn from the injected gitaly-servers metadata
-func ClientConnection(ctx context.Context, storageName string) (*grpc.ClientConn, error) {
- gitalyServersInfo, err := ExtractGitalyServers(ctx)
- if err != nil {
- return nil, err
- }
-
- repoStorageInfo, ok := gitalyServersInfo[storageName]
- if !ok {
- return nil, fmt.Errorf("gitaly server info for %q not found", storageName)
- }
-
+// DialServer creates a client connection for a gitaly server
+func DialServer(gitalyServer map[string]string) (*grpc.ClientConn, error) {
connOpts := []grpc.DialOption{
grpc.WithInsecure(),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(repoStorageInfo["token"])),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyServer["token"])),
}
- conn, err := client.Dial(repoStorageInfo["address"], connOpts)
+ conn, err := client.Dial(gitalyServer["address"], connOpts)
if err != nil {
return nil, fmt.Errorf("could not dial source: %v", err)
}
diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go
index 56f211809..9d0d23300 100644
--- a/internal/service/repository/replicate.go
+++ b/internal/service/repository/replicate.go
@@ -10,6 +10,7 @@ import (
"path/filepath"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/command"
"gitlab.com/gitlab-org/gitaly/internal/config"
@@ -19,6 +20,7 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/streamio"
"golang.org/x/sync/errgroup"
+ "google.golang.org/grpc"
)
func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) {
@@ -27,7 +29,7 @@ func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.Replicate
}
syncFuncs := []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{
- syncInfoAttributes,
+ s.syncInfoAttributes,
}
repoPath, err := helper.GetPath(in.GetRepository())
@@ -36,7 +38,7 @@ func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.Replicate
}
if helper.IsGitDirectory(repoPath) {
- syncFuncs = append(syncFuncs, syncRepository)
+ syncFuncs = append(syncFuncs, s.syncRepository)
} else {
if err = s.create(ctx, in, repoPath); err != nil {
return nil, helper.ErrInternal(err)
@@ -112,7 +114,7 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR
return err
}
- repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName())
+ repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName())
if err != nil {
return err
}
@@ -152,8 +154,8 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR
return nil
}
-func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
- remoteClient, err := newRemoteClient()
+func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ remoteClient, err := s.newRemoteClient()
if err != nil {
return err
}
@@ -168,8 +170,8 @@ func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest
return nil
}
-func syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
- repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName())
+func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName())
if err != nil {
return err
}
@@ -218,21 +220,70 @@ func syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryReq
}
// newRemoteClient creates a new RemoteClient that talks to the same gitaly server
-func newRemoteClient() (gitalypb.RemoteServiceClient, error) {
- conn, err := client.Dial(fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), nil)
+func (s *server) newRemoteClient() (gitalypb.RemoteServiceClient, error) {
+ cc, err := s.getOrCreateConnection(map[string]string{
+ "address": fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()),
+ })
if err != nil {
- return nil, fmt.Errorf("could not dial source: %v", err)
+ return nil, err
}
- return gitalypb.NewRemoteServiceClient(conn), nil
+ return gitalypb.NewRemoteServiceClient(cc), nil
}
// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository
-func newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) {
- conn, err := helper.ClientConnection(ctx, storageName)
+func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) {
+ conn, err := s.getConnectionByStorage(ctx, storageName)
if err != nil {
return nil, err
}
return gitalypb.NewRepositoryServiceClient(conn), nil
}
+
+func (s *server) getConnectionByStorage(ctx context.Context, storageName string) (*grpc.ClientConn, error) {
+ gitalyServerInfo, err := helper.ExtractGitalyServer(ctx, storageName)
+ if err != nil {
+ return nil, err
+ }
+
+ return s.getOrCreateConnection(gitalyServerInfo)
+}
+
+func (s *server) getOrCreateConnection(server map[string]string) (*grpc.ClientConn, error) {
+ address, token := server["address"], server["token"]
+ if address == "" {
+ return nil, errors.New("address is empty")
+ }
+
+ s.connsMtx.RLock()
+ cc, ok := s.connsByAddress[address]
+ s.connsMtx.RUnlock()
+
+ if ok {
+ return cc, nil
+ }
+
+ s.connsMtx.Lock()
+ defer s.connsMtx.Unlock()
+
+ connOpts := []grpc.DialOption{grpc.WithInsecure()}
+
+ if token != "" {
+ connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token)))
+ }
+
+ cc, ok = s.connsByAddress[address]
+ if ok {
+ return cc, nil
+ }
+
+ cc, err := client.Dial(address, connOpts)
+ if err != nil {
+ return nil, fmt.Errorf("could not dial source: %v", err)
+ }
+
+ s.connsByAddress[address] = cc
+
+ return cc, nil
+}
diff --git a/internal/service/repository/server.go b/internal/service/repository/server.go
index 04e2a1eda..2b4bb737a 100644
--- a/internal/service/repository/server.go
+++ b/internal/service/repository/server.go
@@ -2,20 +2,24 @@ package repository
import (
"context"
+ "sync"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
)
type server struct {
ruby *rubyserver.Server
gitalypb.UnimplementedRepositoryServiceServer
+ connsByAddress map[string]*grpc.ClientConn
+ connsMtx sync.RWMutex
}
// NewServer creates a new instance of a gRPC repo server
func NewServer(rs *rubyserver.Server) gitalypb.RepositoryServiceServer {
- return &server{ruby: rs}
+ return &server{ruby: rs, connsByAddress: make(map[string]*grpc.ClientConn)}
}
func (*server) FetchHTTPRemote(context.Context, *gitalypb.FetchHTTPRemoteRequest) (*gitalypb.FetchHTTPRemoteResponse, error) {
diff --git a/internal/service/repository/server_test.go b/internal/service/repository/server_test.go
new file mode 100644
index 000000000..d113a9da4
--- /dev/null
+++ b/internal/service/repository/server_test.go
@@ -0,0 +1,66 @@
+package repository
+
+import (
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+func TestGetConnectionByStorage(t *testing.T) {
+ s := server{connsByAddress: make(map[string]*grpc.ClientConn)}
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ storageName, address := "default", "unix://fake/address/wont/work"
+ injectedCtx, err := helper.InjectGitalyServers(ctx, storageName, address, "token")
+ require.NoError(t, err)
+
+ md, ok := metadata.FromOutgoingContext(injectedCtx)
+ require.True(t, ok)
+
+ incomingCtx := metadata.NewIncomingContext(ctx, md)
+
+ cc, err := s.getConnectionByStorage(incomingCtx, storageName)
+ require.NoError(t, err)
+
+ cc1, err := s.getConnectionByStorage(incomingCtx, storageName)
+ require.NoError(t, err)
+ require.True(t, cc == cc1, "cc1 should be the cached copy")
+}
+
+func TestGetConnectionsConcurrentAccess(t *testing.T) {
+ s := server{connsByAddress: make(map[string]*grpc.ClientConn)}
+
+ address := "unix://fake/address/wont/work"
+
+ var remoteClient gitalypb.RemoteServiceClient
+ var cc *grpc.ClientConn
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ go func() {
+ var err error
+ cc, err = s.getOrCreateConnection(map[string]string{"address": address})
+ require.NoError(t, err)
+ wg.Done()
+ }()
+
+ go func() {
+ var err error
+ remoteClient, err = s.newRemoteClient()
+ require.NoError(t, err)
+ wg.Done()
+ }()
+
+ wg.Wait()
+ require.NotNil(t, cc)
+ require.NotNil(t, remoteClient)
+}