diff options
author | John Cai <jcai@gitlab.com> | 2019-12-10 04:15:07 +0300 |
---|---|---|
committer | jramsay <jcai@gitlab.com> | 2019-12-14 04:47:07 +0300 |
commit | a0f37a4b28bb62b52a32d075098383849848571f (patch) | |
tree | 8274adad898fe67409bdb3c57cd8b21739851a99 | |
parent | c1a8ef38e514eb746bc60aa61fc2e3ff715ebacb (diff) |
Add ListRepositories RPC implementationjc-list-repositories
-rw-r--r-- | changelogs/unreleased/jc-list-repositories.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 16 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 35 | ||||
-rw-r--r-- | internal/praefect/info_service_test.go | 132 | ||||
-rw-r--r-- | internal/praefect/server.go | 15 | ||||
-rw-r--r-- | internal/praefect/service/info/repositories.go | 99 | ||||
-rw-r--r-- | internal/praefect/service/info/repository_replicas.go | 11 | ||||
-rw-r--r-- | internal/praefect/service/info/server.go | 14 |
10 files changed, 312 insertions, 19 deletions
diff --git a/changelogs/unreleased/jc-list-repositories.yml b/changelogs/unreleased/jc-list-repositories.yml new file mode 100644 index 000000000..74e25632a --- /dev/null +++ b/changelogs/unreleased/jc-list-repositories.yml @@ -0,0 +1,5 @@ +--- +title: Add ListRepositories RPC implementation +merge_request: 1692 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index b667273d7..8f662c058 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -114,7 +114,7 @@ func run(cfgs []starter.Config, conf config.Config) error { ds = datastore.NewInMemory(conf) coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr("default", logger, ds, clientConnections) - srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) + srv = praefect.NewServer(ds, coordinator, repl, logger, clientConnections, conf) serverErrors = make(chan error, 1) ) diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index f9130bfd1..1bb02e18c 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -197,7 +197,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections) - srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf) + srv := NewServer(ds, coordinator, replMgr, logEntry, clientConnections, conf) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 378adc692..1e860812a 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -97,6 +97,8 @@ type ReplicasDatastore interface { RemoveReplica(relativePath, nodeStorage string) error GetRepository(relativePath string) (*models.Repository, error) + + GetRepositories() ([]models.Repository, error) } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -336,6 +338,20 @@ func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repositor return &repository, nil } +// GetRepositories gets all repositories +func (md *MemoryDatastore) GetRepositories() ([]models.Repository, error) { + md.repositories.RLock() + defer md.repositories.RUnlock() + + var repositories []models.Repository + + for _, repository := range md.repositories.m { + repositories = append(repositories, repository) + } + + return repositories, nil +} + // ErrReplicasMissing indicates the repository does not have any backup // replicas var ErrReplicasMissing = errors.New("repository missing secondary replicas") diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index f0058be82..7e78a9ebb 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" + "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -90,9 +92,9 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti clientCC, ) server := NewServer( + ds, coordinator, replmgr, - nil, l, clientCC, conf, @@ -187,9 +189,9 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client ) prf := NewServer( + ds, coordinator, replmgr, - nil, logEntry, clientCC, conf, @@ -317,3 +319,32 @@ func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) return fmt.Sprintf("tcp://localhost:%d", port), cleanup } + +// CreateNodeStorages creates a storage location for each praefect node and modifies the gitaly singleton +// config with the paths. It returns a cleanup function that cleans up all the temp directories as well as restores +// the original gitaly config +func CreateNodeStorages(t *testing.T, nodes []*models.Node) func() { + oldConfig := gitalyconfig.Config + + var tempDirCleanups []func() error + // for storages other than default, create a tempdir to store the repositories + for _, node := range nodes { + if node.Storage == "default" { + continue + } + path, cleanup := testhelper.TempDir(t, "", node.Storage) + tempDirCleanups = append(tempDirCleanups, cleanup) + + gitalyconfig.Config.Storages = append(gitalyconfig.Config.Storages, gitalyconfig.Storage{ + Name: node.Storage, + Path: path, + }) + } + + return func() { + gitalyconfig.Config = oldConfig + for _, cleanup := range tempDirCleanups { + cleanup() + } + } +} diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go new file mode 100644 index 000000000..9b201f24f --- /dev/null +++ b/internal/praefect/info_service_test.go @@ -0,0 +1,132 @@ +package praefect + +import ( + "io" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + + "github.com/stretchr/testify/require" +) + +func TestInfoService_ListRepositories(t *testing.T) { + testRepo, _, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + conf := testConfig(3) + + cleanup := CreateNodeStorages(t, conf.VirtualStorages[0].Nodes) + defer cleanup() + + var primaryRepoPath string + for _, node := range conf.VirtualStorages[0].Nodes { + // we don't need to clean up the repos because the cleanup function from CreateNodeStorages + // will clean up the entire temp dir + _, destRepoPath, cleanup := cloneRepoAtStorage(t, testRepo, node.Storage) + defer cleanup() + if node.DefaultPrimary { + primaryRepoPath = destRepoPath + } + } + + primaryRepo := *testRepo + primaryRepo.StorageName = conf.VirtualStorages[0].Name + + ctx, cancel := testhelper.Context() + defer cancel() + + cc, _, cleanup := runPraefectServerWithGitaly(t, conf) + defer cleanup() + + repoClient := gitalypb.NewRepositoryServiceClient(cc) + + checksumResp, err := repoClient.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{Repository: &primaryRepo}) + require.NoError(t, err) + primaryChecksum := checksumResp.GetChecksum() + + infoClient := gitalypb.NewInfoServiceClient(cc) + stream, err := infoClient.ListRepositories(ctx, &gitalypb.ListRepositoriesRequest{}) + require.NoError(t, err) + + responses := readListRepositoriesResponses(t, stream) + + require.Len(t, responses, 1) + require.Equal(t, &gitalypb.ListRepositoriesResponse{ + Primary: &gitalypb.ListRepositoriesResponse_RepositoryDetails{ + Repository: &gitalypb.Repository{ + StorageName: conf.VirtualStorages[0].Nodes[0].Storage, + RelativePath: primaryRepo.GetRelativePath(), + }, + Checksum: primaryChecksum, + }, + Replicas: []*gitalypb.ListRepositoriesResponse_RepositoryDetails{ + { + Repository: &gitalypb.Repository{ + StorageName: conf.VirtualStorages[0].Nodes[1].Storage, + RelativePath: primaryRepo.GetRelativePath(), + }, + Checksum: primaryChecksum, + }, + { + Repository: &gitalypb.Repository{ + StorageName: conf.VirtualStorages[0].Nodes[2].Storage, + RelativePath: primaryRepo.GetRelativePath(), + }, + Checksum: primaryChecksum, + }, + }, + }, responses[0]) + + // create a commit manually on the primary repo + testhelper.CreateCommitOnNewBranch(t, primaryRepoPath) + newChecksumResp, err := repoClient.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{Repository: &primaryRepo}) + require.NoError(t, err) + oldChecksum := primaryChecksum + + stream, err = infoClient.ListRepositories(ctx, &gitalypb.ListRepositoriesRequest{}) + require.NoError(t, err) + + responses = readListRepositoriesResponses(t, stream) + + require.Len(t, responses, 1) + require.Equal(t, &gitalypb.ListRepositoriesResponse{ + Primary: &gitalypb.ListRepositoriesResponse_RepositoryDetails{ + Repository: &gitalypb.Repository{ + StorageName: conf.VirtualStorages[0].Nodes[0].Storage, + RelativePath: primaryRepo.GetRelativePath(), + }, + Checksum: newChecksumResp.GetChecksum(), + }, + Replicas: []*gitalypb.ListRepositoriesResponse_RepositoryDetails{ + { + Repository: &gitalypb.Repository{ + StorageName: conf.VirtualStorages[0].Nodes[1].Storage, + RelativePath: primaryRepo.GetRelativePath(), + }, + Checksum: oldChecksum, + }, + { + Repository: &gitalypb.Repository{ + StorageName: conf.VirtualStorages[0].Nodes[2].Storage, + RelativePath: primaryRepo.GetRelativePath(), + }, + Checksum: oldChecksum, + }, + }, + }, responses[0]) +} + +func readListRepositoriesResponses(t *testing.T, stream gitalypb.InfoService_ListRepositoriesClient) []*gitalypb.ListRepositoriesResponse { + var responses []*gitalypb.ListRepositoriesResponse + for { + msg, err := stream.Recv() + if err != nil { + require.Error(t, io.EOF) + break + } + responses = append(responses, msg) + } + + return responses +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index ce9e0d0e4..36e2de314 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -16,7 +16,9 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/server" "gitlab.com/gitlab-org/gitaly/internal/server/auth" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -34,6 +36,7 @@ type Server struct { s *grpc.Server conf config.Config l *logrus.Entry + ds datastore.Datastore } func (srv *Server) warnDupeAddrs(c config.Config) { @@ -58,7 +61,15 @@ func (srv *Server) warnDupeAddrs(c config.Config) { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options -func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server { +func NewServer( + datastore datastore.Datastore, + c *Coordinator, + repl ReplMgr, + l *logrus.Entry, + clientConnections *conn.ClientConnections, + conf config.Config, + grpcOpts ...grpc.ServerOption, +) *Server { grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( @@ -94,6 +105,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo clientConnections: clientConnections, conf: conf, l: l, + ds: datastore, } s.warnDupeAddrs(conf) @@ -117,6 +129,7 @@ func (srv *Server) Serve(l net.Listener, secure bool) error { func (srv *Server) RegisterServices() { // ServerServiceServer is necessary for the ServerInfo RPC gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections)) + gitalypb.RegisterInfoServiceServer(srv.s, info.NewServer(srv.conf, srv.ds, srv.clientConnections)) healthpb.RegisterHealthServer(srv.s, health.NewServer()) } diff --git a/internal/praefect/service/info/repositories.go b/internal/praefect/service/info/repositories.go new file mode 100644 index 000000000..438627926 --- /dev/null +++ b/internal/praefect/service/info/repositories.go @@ -0,0 +1,99 @@ +package info + +import ( + "context" + grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +// ListRepositories returns a list of repositories that includes the checksum of the primary as well as the replicas +func (s *Server) ListRepositories(in *gitalypb.ListRepositoriesRequest, stream gitalypb.InfoService_ListRepositoriesServer) error { + repositories, err := s.datastore.GetRepositories() + if err != nil { + return helper.ErrInternal(err) + } + + for _, repository := range repositories { + if err = s.streamRepositoryDetails(stream, &repository); err != nil { + return helper.ErrInternal(err) + } + } + + return nil +} + +func (s *Server) streamRepositoryDetails(stream gitalypb.InfoService_ListRepositoriesServer, repository *models.Repository) error { + var listRepositoriesResp gitalypb.ListRepositoriesResponse + g, ctx := errgroup.WithContext(stream.Context()) + cc, err := s.clientCC.GetConnection(repository.Primary.Storage) + if err != nil { + return err + } + + // primary + g.Go(func() error { + listRepositoriesResp.Primary, err = getRepositoryDetails( + ctx, + &gitalypb.Repository{ + StorageName: repository.Primary.Storage, + RelativePath: repository.RelativePath, + }, cc) + + return err + }) + + // replicas + listRepositoriesResp.Replicas = make([]*gitalypb.ListRepositoriesResponse_RepositoryDetails, len(repository.Replicas)) + + for i, replica := range repository.Replicas { + i := i // rescoping + replica := replica // rescoping + cc, err := s.clientCC.GetConnection(replica.Storage) + if err != nil { + return err + } + + g.Go(func() error { + listRepositoriesResp.Replicas[i], err = getRepositoryDetails(ctx, &gitalypb.Repository{ + StorageName: replica.Storage, + RelativePath: repository.RelativePath, + }, cc) + + return err + }) + } + + if err := g.Wait(); err != nil { + grpc_logrus.Extract(ctx).WithError(err).Error() + return nil + } + + if err := stream.Send(&listRepositoriesResp); err != nil { + return err + } + + return nil +} + +func getRepositoryDetails(ctx context.Context, repo *gitalypb.Repository, cc *grpc.ClientConn) (*gitalypb.ListRepositoriesResponse_RepositoryDetails, error) { + client := gitalypb.NewRepositoryServiceClient(cc) + + resp, err := client.CalculateChecksum(ctx, + &gitalypb.CalculateChecksumRequest{ + Repository: repo, + }) + if err != nil { + return nil, err + } + + return &gitalypb.ListRepositoriesResponse_RepositoryDetails{ + Repository: repo, + Checksum: resp.GetChecksum(), + }, nil +} diff --git a/internal/praefect/service/info/repository_replicas.go b/internal/praefect/service/info/repository_replicas.go deleted file mode 100644 index 31c4071b8..000000000 --- a/internal/praefect/service/info/repository_replicas.go +++ /dev/null @@ -1,11 +0,0 @@ -package info - -import ( - "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" -) - -// ListRepositories returns a list of repositories that includes the checksum of the primary as well as the replicas -func (s *Server) ListRepositories(in *gitalypb.ListRepositoriesRequest, stream gitalypb.InfoService_ListRepositoriesServer) error { - return helper.Unimplemented -} diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index 0cc75a417..febe8e879 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -2,15 +2,23 @@ package info import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) // Server is a InfoService server -type Server struct{} +type Server struct { + datastore datastore.ReplicasDatastore + clientCC *conn.ClientConnections +} // NewServer creates a new instance of a grpc InfoServiceServer -func NewServer(conf config.Config) gitalypb.InfoServiceServer { - s := &Server{} +func NewServer(conf config.Config, ds datastore.ReplicasDatastore, clientCC *conn.ClientConnections) gitalypb.InfoServiceServer { + s := &Server{ + datastore: ds, + clientCC: clientCC, + } return s } |