Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-12-10 04:15:07 +0300
committerjramsay <jcai@gitlab.com>2019-12-14 04:47:07 +0300
commita0f37a4b28bb62b52a32d075098383849848571f (patch)
tree8274adad898fe67409bdb3c57cd8b21739851a99
parentc1a8ef38e514eb746bc60aa61fc2e3ff715ebacb (diff)
Add ListRepositories RPC implementationjc-list-repositories
-rw-r--r--changelogs/unreleased/jc-list-repositories.yml5
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/datastore/datastore.go16
-rw-r--r--internal/praefect/helper_test.go35
-rw-r--r--internal/praefect/info_service_test.go132
-rw-r--r--internal/praefect/server.go15
-rw-r--r--internal/praefect/service/info/repositories.go99
-rw-r--r--internal/praefect/service/info/repository_replicas.go11
-rw-r--r--internal/praefect/service/info/server.go14
10 files changed, 312 insertions, 19 deletions
diff --git a/changelogs/unreleased/jc-list-repositories.yml b/changelogs/unreleased/jc-list-repositories.yml
new file mode 100644
index 000000000..74e25632a
--- /dev/null
+++ b/changelogs/unreleased/jc-list-repositories.yml
@@ -0,0 +1,5 @@
+---
+title: Add ListRepositories RPC implementation
+merge_request: 1692
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index b667273d7..8f662c058 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -114,7 +114,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
ds = datastore.NewInMemory(conf)
coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
repl = praefect.NewReplMgr("default", logger, ds, clientConnections)
- srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
+ srv = praefect.NewServer(ds, coordinator, repl, logger, clientConnections, conf)
serverErrors = make(chan error, 1)
)
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index f9130bfd1..1bb02e18c 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -197,7 +197,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections)
- srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf)
+ srv := NewServer(ds, coordinator, replMgr, logEntry, clientConnections, conf)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 378adc692..1e860812a 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -97,6 +97,8 @@ type ReplicasDatastore interface {
RemoveReplica(relativePath, nodeStorage string) error
GetRepository(relativePath string) (*models.Repository, error)
+
+ GetRepositories() ([]models.Repository, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -336,6 +338,20 @@ func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repositor
return &repository, nil
}
+// GetRepositories gets all repositories
+func (md *MemoryDatastore) GetRepositories() ([]models.Repository, error) {
+ md.repositories.RLock()
+ defer md.repositories.RUnlock()
+
+ var repositories []models.Repository
+
+ for _, repository := range md.repositories.m {
+ repositories = append(repositories, repository)
+ }
+
+ return repositories, nil
+}
+
// ErrReplicasMissing indicates the repository does not have any backup
// replicas
var ErrReplicasMissing = errors.New("repository missing secondary replicas")
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index f0058be82..7e78a9ebb 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -7,6 +7,8 @@ import (
"testing"
"time"
+ gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
+
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
@@ -90,9 +92,9 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti
clientCC,
)
server := NewServer(
+ ds,
coordinator,
replmgr,
- nil,
l,
clientCC,
conf,
@@ -187,9 +189,9 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
)
prf := NewServer(
+ ds,
coordinator,
replmgr,
- nil,
logEntry,
clientCC,
conf,
@@ -317,3 +319,32 @@ func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer)
return fmt.Sprintf("tcp://localhost:%d", port), cleanup
}
+
+// CreateNodeStorages creates a storage location for each praefect node and modifies the gitaly singleton
+// config with the paths. It returns a cleanup function that cleans up all the temp directories as well as restores
+// the original gitaly config
+func CreateNodeStorages(t *testing.T, nodes []*models.Node) func() {
+ oldConfig := gitalyconfig.Config
+
+ var tempDirCleanups []func() error
+ // for storages other than default, create a tempdir to store the repositories
+ for _, node := range nodes {
+ if node.Storage == "default" {
+ continue
+ }
+ path, cleanup := testhelper.TempDir(t, "", node.Storage)
+ tempDirCleanups = append(tempDirCleanups, cleanup)
+
+ gitalyconfig.Config.Storages = append(gitalyconfig.Config.Storages, gitalyconfig.Storage{
+ Name: node.Storage,
+ Path: path,
+ })
+ }
+
+ return func() {
+ gitalyconfig.Config = oldConfig
+ for _, cleanup := range tempDirCleanups {
+ cleanup()
+ }
+ }
+}
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
new file mode 100644
index 000000000..9b201f24f
--- /dev/null
+++ b/internal/praefect/info_service_test.go
@@ -0,0 +1,132 @@
+package praefect
+
+import (
+ "io"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestInfoService_ListRepositories(t *testing.T) {
+ testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ conf := testConfig(3)
+
+ cleanup := CreateNodeStorages(t, conf.VirtualStorages[0].Nodes)
+ defer cleanup()
+
+ var primaryRepoPath string
+ for _, node := range conf.VirtualStorages[0].Nodes {
+ // we don't need to clean up the repos because the cleanup function from CreateNodeStorages
+ // will clean up the entire temp dir
+ _, destRepoPath, cleanup := cloneRepoAtStorage(t, testRepo, node.Storage)
+ defer cleanup()
+ if node.DefaultPrimary {
+ primaryRepoPath = destRepoPath
+ }
+ }
+
+ primaryRepo := *testRepo
+ primaryRepo.StorageName = conf.VirtualStorages[0].Name
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
+ defer cleanup()
+
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ checksumResp, err := repoClient.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{Repository: &primaryRepo})
+ require.NoError(t, err)
+ primaryChecksum := checksumResp.GetChecksum()
+
+ infoClient := gitalypb.NewInfoServiceClient(cc)
+ stream, err := infoClient.ListRepositories(ctx, &gitalypb.ListRepositoriesRequest{})
+ require.NoError(t, err)
+
+ responses := readListRepositoriesResponses(t, stream)
+
+ require.Len(t, responses, 1)
+ require.Equal(t, &gitalypb.ListRepositoriesResponse{
+ Primary: &gitalypb.ListRepositoriesResponse_RepositoryDetails{
+ Repository: &gitalypb.Repository{
+ StorageName: conf.VirtualStorages[0].Nodes[0].Storage,
+ RelativePath: primaryRepo.GetRelativePath(),
+ },
+ Checksum: primaryChecksum,
+ },
+ Replicas: []*gitalypb.ListRepositoriesResponse_RepositoryDetails{
+ {
+ Repository: &gitalypb.Repository{
+ StorageName: conf.VirtualStorages[0].Nodes[1].Storage,
+ RelativePath: primaryRepo.GetRelativePath(),
+ },
+ Checksum: primaryChecksum,
+ },
+ {
+ Repository: &gitalypb.Repository{
+ StorageName: conf.VirtualStorages[0].Nodes[2].Storage,
+ RelativePath: primaryRepo.GetRelativePath(),
+ },
+ Checksum: primaryChecksum,
+ },
+ },
+ }, responses[0])
+
+ // create a commit manually on the primary repo
+ testhelper.CreateCommitOnNewBranch(t, primaryRepoPath)
+ newChecksumResp, err := repoClient.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{Repository: &primaryRepo})
+ require.NoError(t, err)
+ oldChecksum := primaryChecksum
+
+ stream, err = infoClient.ListRepositories(ctx, &gitalypb.ListRepositoriesRequest{})
+ require.NoError(t, err)
+
+ responses = readListRepositoriesResponses(t, stream)
+
+ require.Len(t, responses, 1)
+ require.Equal(t, &gitalypb.ListRepositoriesResponse{
+ Primary: &gitalypb.ListRepositoriesResponse_RepositoryDetails{
+ Repository: &gitalypb.Repository{
+ StorageName: conf.VirtualStorages[0].Nodes[0].Storage,
+ RelativePath: primaryRepo.GetRelativePath(),
+ },
+ Checksum: newChecksumResp.GetChecksum(),
+ },
+ Replicas: []*gitalypb.ListRepositoriesResponse_RepositoryDetails{
+ {
+ Repository: &gitalypb.Repository{
+ StorageName: conf.VirtualStorages[0].Nodes[1].Storage,
+ RelativePath: primaryRepo.GetRelativePath(),
+ },
+ Checksum: oldChecksum,
+ },
+ {
+ Repository: &gitalypb.Repository{
+ StorageName: conf.VirtualStorages[0].Nodes[2].Storage,
+ RelativePath: primaryRepo.GetRelativePath(),
+ },
+ Checksum: oldChecksum,
+ },
+ },
+ }, responses[0])
+}
+
+func readListRepositoriesResponses(t *testing.T, stream gitalypb.InfoService_ListRepositoriesClient) []*gitalypb.ListRepositoriesResponse {
+ var responses []*gitalypb.ListRepositoriesResponse
+ for {
+ msg, err := stream.Recv()
+ if err != nil {
+ require.Error(t, io.EOF)
+ break
+ }
+ responses = append(responses, msg)
+ }
+
+ return responses
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index ce9e0d0e4..36e2de314 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -16,7 +16,9 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/server"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -34,6 +36,7 @@ type Server struct {
s *grpc.Server
conf config.Config
l *logrus.Entry
+ ds datastore.Datastore
}
func (srv *Server) warnDupeAddrs(c config.Config) {
@@ -58,7 +61,15 @@ func (srv *Server) warnDupeAddrs(c config.Config) {
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
-func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server {
+func NewServer(
+ datastore datastore.Datastore,
+ c *Coordinator,
+ repl ReplMgr,
+ l *logrus.Entry,
+ clientConnections *conn.ClientConnections,
+ conf config.Config,
+ grpcOpts ...grpc.ServerOption,
+) *Server {
grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
@@ -94,6 +105,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
clientConnections: clientConnections,
conf: conf,
l: l,
+ ds: datastore,
}
s.warnDupeAddrs(conf)
@@ -117,6 +129,7 @@ func (srv *Server) Serve(l net.Listener, secure bool) error {
func (srv *Server) RegisterServices() {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections))
+ gitalypb.RegisterInfoServiceServer(srv.s, info.NewServer(srv.conf, srv.ds, srv.clientConnections))
healthpb.RegisterHealthServer(srv.s, health.NewServer())
}
diff --git a/internal/praefect/service/info/repositories.go b/internal/praefect/service/info/repositories.go
new file mode 100644
index 000000000..438627926
--- /dev/null
+++ b/internal/praefect/service/info/repositories.go
@@ -0,0 +1,99 @@
+package info
+
+import (
+ "context"
+ grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/grpc"
+)
+
+// ListRepositories returns a list of repositories that includes the checksum of the primary as well as the replicas
+func (s *Server) ListRepositories(in *gitalypb.ListRepositoriesRequest, stream gitalypb.InfoService_ListRepositoriesServer) error {
+ repositories, err := s.datastore.GetRepositories()
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ for _, repository := range repositories {
+ if err = s.streamRepositoryDetails(stream, &repository); err != nil {
+ return helper.ErrInternal(err)
+ }
+ }
+
+ return nil
+}
+
+func (s *Server) streamRepositoryDetails(stream gitalypb.InfoService_ListRepositoriesServer, repository *models.Repository) error {
+ var listRepositoriesResp gitalypb.ListRepositoriesResponse
+ g, ctx := errgroup.WithContext(stream.Context())
+ cc, err := s.clientCC.GetConnection(repository.Primary.Storage)
+ if err != nil {
+ return err
+ }
+
+ // primary
+ g.Go(func() error {
+ listRepositoriesResp.Primary, err = getRepositoryDetails(
+ ctx,
+ &gitalypb.Repository{
+ StorageName: repository.Primary.Storage,
+ RelativePath: repository.RelativePath,
+ }, cc)
+
+ return err
+ })
+
+ // replicas
+ listRepositoriesResp.Replicas = make([]*gitalypb.ListRepositoriesResponse_RepositoryDetails, len(repository.Replicas))
+
+ for i, replica := range repository.Replicas {
+ i := i // rescoping
+ replica := replica // rescoping
+ cc, err := s.clientCC.GetConnection(replica.Storage)
+ if err != nil {
+ return err
+ }
+
+ g.Go(func() error {
+ listRepositoriesResp.Replicas[i], err = getRepositoryDetails(ctx, &gitalypb.Repository{
+ StorageName: replica.Storage,
+ RelativePath: repository.RelativePath,
+ }, cc)
+
+ return err
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ grpc_logrus.Extract(ctx).WithError(err).Error()
+ return nil
+ }
+
+ if err := stream.Send(&listRepositoriesResp); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func getRepositoryDetails(ctx context.Context, repo *gitalypb.Repository, cc *grpc.ClientConn) (*gitalypb.ListRepositoriesResponse_RepositoryDetails, error) {
+ client := gitalypb.NewRepositoryServiceClient(cc)
+
+ resp, err := client.CalculateChecksum(ctx,
+ &gitalypb.CalculateChecksumRequest{
+ Repository: repo,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return &gitalypb.ListRepositoriesResponse_RepositoryDetails{
+ Repository: repo,
+ Checksum: resp.GetChecksum(),
+ }, nil
+}
diff --git a/internal/praefect/service/info/repository_replicas.go b/internal/praefect/service/info/repository_replicas.go
deleted file mode 100644
index 31c4071b8..000000000
--- a/internal/praefect/service/info/repository_replicas.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package info
-
-import (
- "gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
-)
-
-// ListRepositories returns a list of repositories that includes the checksum of the primary as well as the replicas
-func (s *Server) ListRepositories(in *gitalypb.ListRepositoriesRequest, stream gitalypb.InfoService_ListRepositoriesServer) error {
- return helper.Unimplemented
-}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 0cc75a417..febe8e879 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -2,15 +2,23 @@ package info
import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
// Server is a InfoService server
-type Server struct{}
+type Server struct {
+ datastore datastore.ReplicasDatastore
+ clientCC *conn.ClientConnections
+}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(conf config.Config) gitalypb.InfoServiceServer {
- s := &Server{}
+func NewServer(conf config.Config, ds datastore.ReplicasDatastore, clientCC *conn.ClientConnections) gitalypb.InfoServiceServer {
+ s := &Server{
+ datastore: ds,
+ clientCC: clientCC,
+ }
return s
}