diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-02-05 20:29:09 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-02-05 20:29:09 +0300 |
commit | 5e915a9ed81ff5ceb0d0a65da80d022eed14ea83 (patch) | |
tree | 7b8c6962856fb360c21ca8d66d324f7e67593d93 | |
parent | 760601790d0d2d382eb603b00e571446456f702e (diff) | |
parent | 89272d1821f804110a9753de2ed128cfdbd2bb97 (diff) |
Merge branch 'jc-reuse-cc' into 'master'
Cache and reuse client connection in ReplicateRepository
Closes #2425
See merge request gitlab-org/gitaly!1801
-rw-r--r-- | changelogs/unreleased/jc-reuse-cc.yml | 5 | ||||
-rw-r--r-- | internal/helper/storage.go | 34 | ||||
-rw-r--r-- | internal/service/repository/replicate.go | 77 | ||||
-rw-r--r-- | internal/service/repository/server.go | 6 | ||||
-rw-r--r-- | internal/service/repository/server_test.go | 66 |
5 files changed, 160 insertions, 28 deletions
diff --git a/changelogs/unreleased/jc-reuse-cc.yml b/changelogs/unreleased/jc-reuse-cc.yml new file mode 100644 index 000000000..43a9758c4 --- /dev/null +++ b/changelogs/unreleased/jc-reuse-cc.yml @@ -0,0 +1,5 @@ +--- +title: Cache and reuse client connection in ReplicateRepository +merge_request: 1801 +author: +type: fixed diff --git a/internal/helper/storage.go b/internal/helper/storage.go index 133a8a11d..6efa280e3 100644 --- a/internal/helper/storage.go +++ b/internal/helper/storage.go @@ -4,6 +4,7 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" @@ -37,6 +38,21 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly return } +// ExtractGitalyServer extracts server information for a specific storage +func ExtractGitalyServer(ctx context.Context, storageName string) (map[string]string, error) { + gitalyServers, err := ExtractGitalyServers(ctx) + if err != nil { + return nil, err + } + + gitalyServer, ok := gitalyServers[storageName] + if !ok { + return nil, errors.New("storage name not found") + } + + return gitalyServer, nil +} + // IncomingToOutgoing creates an outgoing context out of an incoming context with the same storage metadata func IncomingToOutgoing(ctx context.Context) context.Context { md, ok := metadata.FromIncomingContext(ctx) @@ -64,24 +80,14 @@ func InjectGitalyServers(ctx context.Context, name, address, token string) (cont return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))), nil } -// ClientConnection creates a grpc.ClientConn from the injected gitaly-servers metadata -func ClientConnection(ctx context.Context, storageName string) (*grpc.ClientConn, error) { - gitalyServersInfo, err := ExtractGitalyServers(ctx) - if err != nil { - return nil, err - } - - repoStorageInfo, ok := gitalyServersInfo[storageName] - if !ok { - return nil, fmt.Errorf("gitaly server info for %q not found", storageName) - } - +// DialServer creates a client connection for a gitaly server +func DialServer(gitalyServer map[string]string) (*grpc.ClientConn, error) { connOpts := []grpc.DialOption{ grpc.WithInsecure(), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(repoStorageInfo["token"])), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyServer["token"])), } - conn, err := client.Dial(repoStorageInfo["address"], connOpts) + conn, err := client.Dial(gitalyServer["address"], connOpts) if err != nil { return nil, fmt.Errorf("could not dial source: %v", err) } diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go index 56f211809..9d0d23300 100644 --- a/internal/service/repository/replicate.go +++ b/internal/service/repository/replicate.go @@ -10,6 +10,7 @@ import ( "path/filepath" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/command" "gitlab.com/gitlab-org/gitaly/internal/config" @@ -19,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" ) func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) { @@ -27,7 +29,7 @@ func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.Replicate } syncFuncs := []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{ - syncInfoAttributes, + s.syncInfoAttributes, } repoPath, err := helper.GetPath(in.GetRepository()) @@ -36,7 +38,7 @@ func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.Replicate } if helper.IsGitDirectory(repoPath) { - syncFuncs = append(syncFuncs, syncRepository) + syncFuncs = append(syncFuncs, s.syncRepository) } else { if err = s.create(ctx, in, repoPath); err != nil { return nil, helper.ErrInternal(err) @@ -112,7 +114,7 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR return err } - repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName()) + repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName()) if err != nil { return err } @@ -152,8 +154,8 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR return nil } -func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { - remoteClient, err := newRemoteClient() +func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + remoteClient, err := s.newRemoteClient() if err != nil { return err } @@ -168,8 +170,8 @@ func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest return nil } -func syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { - repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName()) +func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName()) if err != nil { return err } @@ -218,21 +220,70 @@ func syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryReq } // newRemoteClient creates a new RemoteClient that talks to the same gitaly server -func newRemoteClient() (gitalypb.RemoteServiceClient, error) { - conn, err := client.Dial(fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), nil) +func (s *server) newRemoteClient() (gitalypb.RemoteServiceClient, error) { + cc, err := s.getOrCreateConnection(map[string]string{ + "address": fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), + }) if err != nil { - return nil, fmt.Errorf("could not dial source: %v", err) + return nil, err } - return gitalypb.NewRemoteServiceClient(conn), nil + return gitalypb.NewRemoteServiceClient(cc), nil } // newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository -func newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) { - conn, err := helper.ClientConnection(ctx, storageName) +func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) { + conn, err := s.getConnectionByStorage(ctx, storageName) if err != nil { return nil, err } return gitalypb.NewRepositoryServiceClient(conn), nil } + +func (s *server) getConnectionByStorage(ctx context.Context, storageName string) (*grpc.ClientConn, error) { + gitalyServerInfo, err := helper.ExtractGitalyServer(ctx, storageName) + if err != nil { + return nil, err + } + + return s.getOrCreateConnection(gitalyServerInfo) +} + +func (s *server) getOrCreateConnection(server map[string]string) (*grpc.ClientConn, error) { + address, token := server["address"], server["token"] + if address == "" { + return nil, errors.New("address is empty") + } + + s.connsMtx.RLock() + cc, ok := s.connsByAddress[address] + s.connsMtx.RUnlock() + + if ok { + return cc, nil + } + + s.connsMtx.Lock() + defer s.connsMtx.Unlock() + + connOpts := []grpc.DialOption{grpc.WithInsecure()} + + if token != "" { + connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token))) + } + + cc, ok = s.connsByAddress[address] + if ok { + return cc, nil + } + + cc, err := client.Dial(address, connOpts) + if err != nil { + return nil, fmt.Errorf("could not dial source: %v", err) + } + + s.connsByAddress[address] = cc + + return cc, nil +} diff --git a/internal/service/repository/server.go b/internal/service/repository/server.go index 04e2a1eda..2b4bb737a 100644 --- a/internal/service/repository/server.go +++ b/internal/service/repository/server.go @@ -2,20 +2,24 @@ package repository import ( "context" + "sync" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" ) type server struct { ruby *rubyserver.Server gitalypb.UnimplementedRepositoryServiceServer + connsByAddress map[string]*grpc.ClientConn + connsMtx sync.RWMutex } // NewServer creates a new instance of a gRPC repo server func NewServer(rs *rubyserver.Server) gitalypb.RepositoryServiceServer { - return &server{ruby: rs} + return &server{ruby: rs, connsByAddress: make(map[string]*grpc.ClientConn)} } func (*server) FetchHTTPRemote(context.Context, *gitalypb.FetchHTTPRemoteRequest) (*gitalypb.FetchHTTPRemoteResponse, error) { diff --git a/internal/service/repository/server_test.go b/internal/service/repository/server_test.go new file mode 100644 index 000000000..d113a9da4 --- /dev/null +++ b/internal/service/repository/server_test.go @@ -0,0 +1,66 @@ +package repository + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +func TestGetConnectionByStorage(t *testing.T) { + s := server{connsByAddress: make(map[string]*grpc.ClientConn)} + + ctx, cancel := testhelper.Context() + defer cancel() + + storageName, address := "default", "unix://fake/address/wont/work" + injectedCtx, err := helper.InjectGitalyServers(ctx, storageName, address, "token") + require.NoError(t, err) + + md, ok := metadata.FromOutgoingContext(injectedCtx) + require.True(t, ok) + + incomingCtx := metadata.NewIncomingContext(ctx, md) + + cc, err := s.getConnectionByStorage(incomingCtx, storageName) + require.NoError(t, err) + + cc1, err := s.getConnectionByStorage(incomingCtx, storageName) + require.NoError(t, err) + require.True(t, cc == cc1, "cc1 should be the cached copy") +} + +func TestGetConnectionsConcurrentAccess(t *testing.T) { + s := server{connsByAddress: make(map[string]*grpc.ClientConn)} + + address := "unix://fake/address/wont/work" + + var remoteClient gitalypb.RemoteServiceClient + var cc *grpc.ClientConn + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + var err error + cc, err = s.getOrCreateConnection(map[string]string{"address": address}) + require.NoError(t, err) + wg.Done() + }() + + go func() { + var err error + remoteClient, err = s.newRemoteClient() + require.NoError(t, err) + wg.Done() + }() + + wg.Wait() + require.NotNil(t, cc) + require.NotNil(t, remoteClient) +} |