Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-03-10 03:06:19 +0300
committerJohn Cai <jcai@gitlab.com>2020-03-10 21:16:54 +0300
commit3181a2f6af9c3ae6c9926ff2bdf1de5f3842ade3 (patch)
treeca0539fe19b1c44608d3754cd0e285696c4b355c
parentcd501ca5ad38654b39c1d3c3908f4332e08f85d5 (diff)
Handle WriteRef in praefect with strong consistencyjc-writeref-strong-consistency
-rw-r--r--cmd/praefect/main.go1
-rw-r--r--internal/praefect/helper_test.go22
-rw-r--r--internal/praefect/mutator_test.go247
-rw-r--r--internal/praefect/replicator.go2
-rw-r--r--internal/praefect/server.go9
-rw-r--r--internal/praefect/service/repository/server.go18
-rw-r--r--internal/praefect/service/repository/write_ref.go85
7 files changed, 381 insertions, 3 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index ae81f1680..33f65a8eb 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -184,6 +184,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
srv.RegisterServices(nodeManager, conf)
+ srv.RegisterMutatorMethods(nodeManager, ds)
b.StopAction = srv.GracefulStop
for _, cfg := range cfgs {
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index b87e404db..e5c67012f 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -22,8 +22,12 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
+ "gitlab.com/gitlab-org/gitaly/internal/service/objectpool"
+ "gitlab.com/gitlab-org/gitaly/internal/service/ref"
+ "gitlab.com/gitlab-org/gitaly/internal/service/remote"
"gitlab.com/gitlab-org/gitaly/internal/service/repository"
gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server"
+ "gitlab.com/gitlab-org/gitaly/internal/service/ssh"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -158,9 +162,14 @@ func noopBackoffFunc() (backoff, backoffReset) {
}, func() {}
}
+type PraefectComponents struct {
+ NodeManager nodes.Manager
+ Datastore datastore.Datastore
+}
+
// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes
// requires exactly 1 virtual storage
-func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, PraefectComponents, testhelper.Cleanup) {
require.Len(t, conf.VirtualStorages, 1)
var cleanups []testhelper.Cleanup
@@ -205,6 +214,8 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
ctx, cancel := testhelper.Context()
prf.RegisterServices(nodeMgr, conf)
+ prf.RegisterMutatorMethods(nodeMgr, ds)
+
go func() { errQ <- prf.Serve(listener, false) }()
go func() { errQ <- replmgr.ProcessBacklog(ctx, noopBackoffFunc) }()
@@ -224,7 +235,10 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
require.Error(t, context.Canceled, <-errQ)
}
- return cc, prf, cleanup
+ return cc, PraefectComponents{
+ NodeManager: nodeMgr,
+ Datastore: ds,
+ }, cleanup
}
func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string, func()) {
@@ -246,6 +260,10 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string,
gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer())
gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer, internalSocket))
+ gitalypb.RegisterRefServiceServer(server, ref.NewServer())
+ gitalypb.RegisterRemoteServiceServer(server, remote.NewServer(rubyServer))
+ gitalypb.RegisterObjectPoolServiceServer(server, objectpool.NewServer())
+ gitalypb.RegisterSSHServiceServer(server, ssh.NewServer())
healthpb.RegisterHealthServer(server, health.NewServer())
errQ := make(chan error)
diff --git a/internal/praefect/mutator_test.go b/internal/praefect/mutator_test.go
new file mode 100644
index 000000000..63537522d
--- /dev/null
+++ b/internal/praefect/mutator_test.go
@@ -0,0 +1,247 @@
+package praefect
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func TestRepoService_WriteRef_Success(t *testing.T) {
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ {
+ Storage: "praefect-internal-1",
+ DefaultPrimary: true,
+ },
+ {
+ Storage: "praefect-internal-2",
+ },
+ {
+ Storage: "praefect-internal-3",
+ },
+ },
+ },
+ },
+ }
+
+ testRepo, testRepoPath, cleanup := testhelper.NewTestRepo(t)
+ defer cleanup()
+
+ newRev, _ := testhelper.CreateCommitOnNewBranch(t, testRepoPath)
+
+ cleanupTempStoragePaths := createTempStoragePaths(t, conf.VirtualStorages[0].Nodes)
+ defer cleanupTempStoragePaths()
+
+ for _, node := range conf.VirtualStorages[0].Nodes {
+ cloneRepoAtStorage(t, testRepo, node.Storage)
+ }
+
+ cc, components, cleanup := runPraefectServerWithGitaly(t, conf)
+ defer cleanup()
+
+ nodeMgr := components.NodeManager
+
+ shard, err := nodeMgr.GetShard("default")
+ require.NoError(t, err)
+
+ primary, err := shard.GetPrimary()
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ ref := "refs/heads/master"
+ branch, err := gitalypb.NewRefServiceClient(primary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: primary.GetStorage(),
+ RelativePath: testRepo.GetRelativePath(),
+ },
+ Name: []byte(ref),
+ })
+ require.NoError(t, err)
+
+ oldRev := branch.Branch.TargetCommit.Id
+
+ _, err = gitalypb.NewRepositoryServiceClient(cc).WriteRef(ctx, &gitalypb.WriteRefRequest{
+ Repository: testRepo,
+ Ref: []byte(ref),
+ Revision: []byte(newRev),
+ OldRevision: []byte(oldRev),
+ Force: false,
+ })
+ require.NoError(t, err)
+
+ secondaries, err := shard.GetSecondaries()
+ require.NoError(t, err)
+
+ for _, secondary := range secondaries {
+ branch, err = gitalypb.NewRefServiceClient(secondary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: secondary.GetStorage(),
+ RelativePath: testRepo.GetRelativePath(),
+ },
+ Name: []byte(ref),
+ })
+ require.NoError(t, err)
+ require.Equal(t, newRev, branch.GetBranch().TargetCommit.Id)
+ }
+}
+
+func TestRepoService_WriteRef_Replication(t *testing.T) {
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ {
+ Storage: "praefect-internal-1",
+ DefaultPrimary: true,
+ },
+ {
+ Storage: "praefect-internal-2",
+ },
+ {
+ Storage: "praefect-internal-3",
+ },
+ },
+ },
+ },
+ }
+
+ testRepo, _, cleanup := testhelper.NewTestRepo(t)
+ defer cleanup()
+
+ cleanupTempStoragePaths := createTempStoragePaths(t, conf.VirtualStorages[0].Nodes)
+ defer cleanupTempStoragePaths()
+
+ for _, node := range conf.VirtualStorages[0].Nodes {
+ cloneRepoAtStorage(t, testRepo, node.Storage)
+ }
+
+ primaryRepoPath, err := helper.GetRepoPath(&gitalypb.Repository{
+ StorageName: conf.VirtualStorages[0].Nodes[0].Storage,
+ RelativePath: testRepo.GetRelativePath(),
+ })
+ require.NoError(t, err)
+
+ newRev, _ := testhelper.CreateCommitOnNewBranch(t, primaryRepoPath)
+
+ cc, components, cleanupServers := runPraefectServerWithGitaly(t, conf)
+ defer cleanupServers()
+
+ nodeMgr := components.NodeManager
+
+ shard, err := nodeMgr.GetShard("default")
+ require.NoError(t, err)
+
+ primary, err := shard.GetPrimary()
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ ref := "refs/heads/master"
+ branch, err := gitalypb.NewRefServiceClient(primary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: primary.GetStorage(),
+ RelativePath: testRepo.GetRelativePath(),
+ },
+ Name: []byte(ref),
+ })
+ require.NoError(t, err)
+
+ oldRev := branch.Branch.TargetCommit.Id
+
+ _, err = gitalypb.NewRepositoryServiceClient(cc).WriteRef(ctx, &gitalypb.WriteRefRequest{
+ Repository: testRepo,
+ Ref: []byte(ref),
+ Revision: []byte(newRev),
+ OldRevision: []byte(oldRev),
+ Force: false,
+ })
+ require.NoError(t, err)
+
+ secondaries, err := shard.GetSecondaries()
+ require.NoError(t, err)
+
+ for _, secondary := range secondaries {
+ branch, err = gitalypb.NewRefServiceClient(secondary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: secondary.GetStorage(),
+ RelativePath: testRepo.GetRelativePath(),
+ },
+ Name: []byte(ref),
+ })
+ require.NoError(t, err)
+ require.NotEqual(t, newRev, branch.GetBranch().TargetCommit.Id, "write ref should have failed on the secondaries")
+ }
+
+ timer := time.NewTimer(20 * time.Second)
+ defer timer.Stop()
+
+Test:
+ for {
+ select {
+ case <-timer.C:
+ t.Fatal("time limit expired and jobs have not been completed")
+ default:
+ secondary1Jobs, err := components.Datastore.GetJobs(
+ datastore.JobStateComplete|datastore.JobStateInProgress|datastore.JobStateDead|datastore.JobStateReady|datastore.JobStatePending,
+ conf.VirtualStorages[0].Nodes[1].Storage, 1)
+ require.NoError(t, err)
+ secondary2Jobs, err := components.Datastore.GetJobs(
+ datastore.JobStateComplete|datastore.JobStateInProgress|datastore.JobStateDead|datastore.JobStateReady|datastore.JobStatePending,
+ conf.VirtualStorages[0].Nodes[2].Storage, 1)
+ require.NoError(t, err)
+ if len(secondary1Jobs) == 0 && len(secondary2Jobs) == 0 {
+ break Test
+ }
+ <-time.Tick(100 * time.Millisecond)
+ }
+ }
+
+ for _, secondary := range secondaries {
+ branch, err = gitalypb.NewRefServiceClient(secondary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: secondary.GetStorage(),
+ RelativePath: testRepo.GetRelativePath(),
+ },
+ Name: []byte(ref),
+ })
+ require.NoError(t, err)
+ require.Equal(t, newRev, branch.GetBranch().TargetCommit.Id, "write ref should have failed on the secondaries")
+ }
+}
+
+func createTempStoragePaths(t *testing.T, nodes []*models.Node) func() {
+ oldStorages := gitalyconfig.Config.Storages
+
+ var tempDirCleanups []func() error
+ for _, node := range nodes {
+ tempPath, cleanup := testhelper.TempDir(t, node.Storage)
+ tempDirCleanups = append(tempDirCleanups, cleanup)
+
+ gitalyconfig.Config.Storages = append(gitalyconfig.Config.Storages, gitalyconfig.Storage{
+ Name: node.Storage,
+ Path: tempPath,
+ })
+ }
+
+ return func() {
+ gitalyconfig.Config.Storages = oldStorages
+ for _, tempDirCleanup := range tempDirCleanups {
+ tempDirCleanup()
+ }
+ }
+}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 769d3a56c..b9c40474b 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -48,7 +48,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob
Source: sourceRepository,
Repository: targetRepository,
}); err != nil {
- return fmt.Errorf("failed to create repository: %v", err)
+ return fmt.Errorf("failed to replicate repository: %v", err)
}
// check if the repository has an object pool
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 41f21974e..06b228fc7 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -17,11 +17,13 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service/repository"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/server"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -131,6 +133,13 @@ func (srv *Server) RegisterServices(nm nodes.Manager, conf config.Config) {
grpc_prometheus.Register(srv.s)
}
+func (srv *Server) RegisterMutatorMethods(nm nodes.Manager, datastore datastore.ReplJobsDatastore) {
+ repoServer := repository.NewServer(nm, datastore)
+ proxy.RegisterStreamHandlers(srv.s, "gitaly.RepositoryService", map[string]grpc.StreamHandler{
+ "WriteRef": repoServer.WriteRef,
+ })
+}
+
// Shutdown will attempt a graceful shutdown of the grpc server. If unable
// to gracefully shutdown within the context deadline, it will then
// forcefully shutdown the server and return a context cancellation error.
diff --git a/internal/praefect/service/repository/server.go b/internal/praefect/service/repository/server.go
new file mode 100644
index 000000000..3d8da7b3b
--- /dev/null
+++ b/internal/praefect/service/repository/server.go
@@ -0,0 +1,18 @@
+package repository
+
+import (
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+)
+
+type Server struct {
+ nodeManager nodes.Manager
+ ds datastore.ReplJobsDatastore
+}
+
+func NewServer(nodeMgr nodes.Manager, jobsDS datastore.ReplJobsDatastore) *Server {
+ return &Server{
+ nodeManager: nodeMgr,
+ ds: jobsDS,
+ }
+}
diff --git a/internal/praefect/service/repository/write_ref.go b/internal/praefect/service/repository/write_ref.go
new file mode 100644
index 000000000..455c44b03
--- /dev/null
+++ b/internal/praefect/service/repository/write_ref.go
@@ -0,0 +1,85 @@
+package repository
+
+import (
+ "sync"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+func writeRefReqWithStorage(req gitalypb.WriteRefRequest, storage string) *gitalypb.WriteRefRequest {
+ req.Repository = &gitalypb.Repository{
+ StorageName: storage,
+ RelativePath: req.GetRepository().GetRelativePath(),
+ }
+ return &req
+}
+
+func (s *Server) WriteRef(srv interface{}, stream grpc.ServerStream) error {
+ var writeRefReq gitalypb.WriteRefRequest
+
+ if err := stream.RecvMsg(&writeRefReq); err != nil {
+ return err
+ }
+
+ shard, err := s.nodeManager.GetShard(writeRefReq.GetRepository().GetStorageName())
+ if err != nil {
+ return err
+ }
+
+ primary, err := shard.GetPrimary()
+ if err != nil {
+ return err
+ }
+
+ secondaries, err := shard.GetSecondaries()
+ if err != nil {
+ return err
+ }
+
+ if _, err := gitalypb.NewRepositoryServiceClient(
+ primary.GetConnection()).WriteRef(stream.Context(),
+ writeRefReqWithStorage(writeRefReq, primary.GetStorage()),
+ ); err != nil {
+ return err
+ }
+
+ failedNodeStorages := make([]string, len(secondaries))
+ var wg sync.WaitGroup
+ wg.Add(len(secondaries))
+
+ for i, secondary := range secondaries {
+ i := i
+ secondary := secondary
+ go func() {
+ defer wg.Done()
+ client := gitalypb.NewRepositoryServiceClient(secondary.GetConnection())
+
+ if _, err := client.WriteRef(stream.Context(), writeRefReqWithStorage(writeRefReq, secondary.GetStorage())); err != nil {
+ failedNodeStorages[i] = secondary.GetStorage()
+ }
+ }()
+ }
+ wg.Wait()
+
+ var nodeStoragesToReplicate []string
+ for _, storage := range failedNodeStorages {
+ if storage != "" {
+ nodeStoragesToReplicate = append(nodeStoragesToReplicate, storage)
+ }
+ }
+
+ jobIDs, err := s.ds.CreateReplicaReplJobs(writeRefReq.GetRepository().GetRelativePath(), primary.GetStorage(), nodeStoragesToReplicate, datastore.UpdateRepo, nil)
+ if err != nil {
+ return err
+ }
+
+ for _, jobID := range jobIDs {
+ if err := s.ds.UpdateReplJobState(jobID, datastore.JobStateReady); err != nil {
+ return err
+ }
+ }
+
+ return stream.SendMsg(&gitalypb.WriteRefResponse{})
+}