diff options
author | John Cai <jcai@gitlab.com> | 2020-03-10 03:06:19 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-03-10 21:16:54 +0300 |
commit | 3181a2f6af9c3ae6c9926ff2bdf1de5f3842ade3 (patch) | |
tree | ca0539fe19b1c44608d3754cd0e285696c4b355c | |
parent | cd501ca5ad38654b39c1d3c3908f4332e08f85d5 (diff) |
Handle WriteRef in praefect with strong consistencyjc-writeref-strong-consistency
-rw-r--r-- | cmd/praefect/main.go | 1 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 22 | ||||
-rw-r--r-- | internal/praefect/mutator_test.go | 247 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 2 | ||||
-rw-r--r-- | internal/praefect/server.go | 9 | ||||
-rw-r--r-- | internal/praefect/service/repository/server.go | 18 | ||||
-rw-r--r-- | internal/praefect/service/repository/write_ref.go | 85 |
7 files changed, 381 insertions, 3 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index ae81f1680..33f65a8eb 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -184,6 +184,7 @@ func run(cfgs []starter.Config, conf config.Config) error { } srv.RegisterServices(nodeManager, conf) + srv.RegisterMutatorMethods(nodeManager, ds) b.StopAction = srv.GracefulStop for _, cfg := range cfgs { diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index b87e404db..e5c67012f 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -22,8 +22,12 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/server/auth" + "gitlab.com/gitlab-org/gitaly/internal/service/objectpool" + "gitlab.com/gitlab-org/gitaly/internal/service/ref" + "gitlab.com/gitlab-org/gitaly/internal/service/remote" "gitlab.com/gitlab-org/gitaly/internal/service/repository" gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server" + "gitlab.com/gitlab-org/gitaly/internal/service/ssh" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -158,9 +162,14 @@ func noopBackoffFunc() (backoff, backoffReset) { }, func() {} } +type PraefectComponents struct { + NodeManager nodes.Manager + Datastore datastore.Datastore +} + // runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes // requires exactly 1 virtual storage -func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { +func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, PraefectComponents, testhelper.Cleanup) { require.Len(t, conf.VirtualStorages, 1) var cleanups []testhelper.Cleanup @@ -205,6 +214,8 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client ctx, cancel := testhelper.Context() prf.RegisterServices(nodeMgr, conf) + prf.RegisterMutatorMethods(nodeMgr, ds) + go func() { errQ <- prf.Serve(listener, false) }() go func() { errQ <- replmgr.ProcessBacklog(ctx, noopBackoffFunc) }() @@ -224,7 +235,10 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client require.Error(t, context.Canceled, <-errQ) } - return cc, prf, cleanup + return cc, PraefectComponents{ + NodeManager: nodeMgr, + Datastore: ds, + }, cleanup } func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string, func()) { @@ -246,6 +260,10 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string, gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer()) gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer, internalSocket)) + gitalypb.RegisterRefServiceServer(server, ref.NewServer()) + gitalypb.RegisterRemoteServiceServer(server, remote.NewServer(rubyServer)) + gitalypb.RegisterObjectPoolServiceServer(server, objectpool.NewServer()) + gitalypb.RegisterSSHServiceServer(server, ssh.NewServer()) healthpb.RegisterHealthServer(server, health.NewServer()) errQ := make(chan error) diff --git a/internal/praefect/mutator_test.go b/internal/praefect/mutator_test.go new file mode 100644 index 000000000..63537522d --- /dev/null +++ b/internal/praefect/mutator_test.go @@ -0,0 +1,247 @@ +package praefect + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +func TestRepoService_WriteRef_Success(t *testing.T) { + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: "praefect-internal-1", + DefaultPrimary: true, + }, + { + Storage: "praefect-internal-2", + }, + { + Storage: "praefect-internal-3", + }, + }, + }, + }, + } + + testRepo, testRepoPath, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + + newRev, _ := testhelper.CreateCommitOnNewBranch(t, testRepoPath) + + cleanupTempStoragePaths := createTempStoragePaths(t, conf.VirtualStorages[0].Nodes) + defer cleanupTempStoragePaths() + + for _, node := range conf.VirtualStorages[0].Nodes { + cloneRepoAtStorage(t, testRepo, node.Storage) + } + + cc, components, cleanup := runPraefectServerWithGitaly(t, conf) + defer cleanup() + + nodeMgr := components.NodeManager + + shard, err := nodeMgr.GetShard("default") + require.NoError(t, err) + + primary, err := shard.GetPrimary() + require.NoError(t, err) + + ctx, cancel := testhelper.Context() + defer cancel() + + ref := "refs/heads/master" + branch, err := gitalypb.NewRefServiceClient(primary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{ + Repository: &gitalypb.Repository{ + StorageName: primary.GetStorage(), + RelativePath: testRepo.GetRelativePath(), + }, + Name: []byte(ref), + }) + require.NoError(t, err) + + oldRev := branch.Branch.TargetCommit.Id + + _, err = gitalypb.NewRepositoryServiceClient(cc).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: testRepo, + Ref: []byte(ref), + Revision: []byte(newRev), + OldRevision: []byte(oldRev), + Force: false, + }) + require.NoError(t, err) + + secondaries, err := shard.GetSecondaries() + require.NoError(t, err) + + for _, secondary := range secondaries { + branch, err = gitalypb.NewRefServiceClient(secondary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{ + Repository: &gitalypb.Repository{ + StorageName: secondary.GetStorage(), + RelativePath: testRepo.GetRelativePath(), + }, + Name: []byte(ref), + }) + require.NoError(t, err) + require.Equal(t, newRev, branch.GetBranch().TargetCommit.Id) + } +} + +func TestRepoService_WriteRef_Replication(t *testing.T) { + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: "praefect-internal-1", + DefaultPrimary: true, + }, + { + Storage: "praefect-internal-2", + }, + { + Storage: "praefect-internal-3", + }, + }, + }, + }, + } + + testRepo, _, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + + cleanupTempStoragePaths := createTempStoragePaths(t, conf.VirtualStorages[0].Nodes) + defer cleanupTempStoragePaths() + + for _, node := range conf.VirtualStorages[0].Nodes { + cloneRepoAtStorage(t, testRepo, node.Storage) + } + + primaryRepoPath, err := helper.GetRepoPath(&gitalypb.Repository{ + StorageName: conf.VirtualStorages[0].Nodes[0].Storage, + RelativePath: testRepo.GetRelativePath(), + }) + require.NoError(t, err) + + newRev, _ := testhelper.CreateCommitOnNewBranch(t, primaryRepoPath) + + cc, components, cleanupServers := runPraefectServerWithGitaly(t, conf) + defer cleanupServers() + + nodeMgr := components.NodeManager + + shard, err := nodeMgr.GetShard("default") + require.NoError(t, err) + + primary, err := shard.GetPrimary() + require.NoError(t, err) + + ctx, cancel := testhelper.Context() + defer cancel() + + ref := "refs/heads/master" + branch, err := gitalypb.NewRefServiceClient(primary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{ + Repository: &gitalypb.Repository{ + StorageName: primary.GetStorage(), + RelativePath: testRepo.GetRelativePath(), + }, + Name: []byte(ref), + }) + require.NoError(t, err) + + oldRev := branch.Branch.TargetCommit.Id + + _, err = gitalypb.NewRepositoryServiceClient(cc).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: testRepo, + Ref: []byte(ref), + Revision: []byte(newRev), + OldRevision: []byte(oldRev), + Force: false, + }) + require.NoError(t, err) + + secondaries, err := shard.GetSecondaries() + require.NoError(t, err) + + for _, secondary := range secondaries { + branch, err = gitalypb.NewRefServiceClient(secondary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{ + Repository: &gitalypb.Repository{ + StorageName: secondary.GetStorage(), + RelativePath: testRepo.GetRelativePath(), + }, + Name: []byte(ref), + }) + require.NoError(t, err) + require.NotEqual(t, newRev, branch.GetBranch().TargetCommit.Id, "write ref should have failed on the secondaries") + } + + timer := time.NewTimer(20 * time.Second) + defer timer.Stop() + +Test: + for { + select { + case <-timer.C: + t.Fatal("time limit expired and jobs have not been completed") + default: + secondary1Jobs, err := components.Datastore.GetJobs( + datastore.JobStateComplete|datastore.JobStateInProgress|datastore.JobStateDead|datastore.JobStateReady|datastore.JobStatePending, + conf.VirtualStorages[0].Nodes[1].Storage, 1) + require.NoError(t, err) + secondary2Jobs, err := components.Datastore.GetJobs( + datastore.JobStateComplete|datastore.JobStateInProgress|datastore.JobStateDead|datastore.JobStateReady|datastore.JobStatePending, + conf.VirtualStorages[0].Nodes[2].Storage, 1) + require.NoError(t, err) + if len(secondary1Jobs) == 0 && len(secondary2Jobs) == 0 { + break Test + } + <-time.Tick(100 * time.Millisecond) + } + } + + for _, secondary := range secondaries { + branch, err = gitalypb.NewRefServiceClient(secondary.GetConnection()).FindBranch(ctx, &gitalypb.FindBranchRequest{ + Repository: &gitalypb.Repository{ + StorageName: secondary.GetStorage(), + RelativePath: testRepo.GetRelativePath(), + }, + Name: []byte(ref), + }) + require.NoError(t, err) + require.Equal(t, newRev, branch.GetBranch().TargetCommit.Id, "write ref should have failed on the secondaries") + } +} + +func createTempStoragePaths(t *testing.T, nodes []*models.Node) func() { + oldStorages := gitalyconfig.Config.Storages + + var tempDirCleanups []func() error + for _, node := range nodes { + tempPath, cleanup := testhelper.TempDir(t, node.Storage) + tempDirCleanups = append(tempDirCleanups, cleanup) + + gitalyconfig.Config.Storages = append(gitalyconfig.Config.Storages, gitalyconfig.Storage{ + Name: node.Storage, + Path: tempPath, + }) + } + + return func() { + gitalyconfig.Config.Storages = oldStorages + for _, tempDirCleanup := range tempDirCleanups { + tempDirCleanup() + } + } +} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 769d3a56c..b9c40474b 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -48,7 +48,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob Source: sourceRepository, Repository: targetRepository, }); err != nil { - return fmt.Errorf("failed to create repository: %v", err) + return fmt.Errorf("failed to replicate repository: %v", err) } // check if the repository has an object pool diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 41f21974e..06b228fc7 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -17,11 +17,13 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service/repository" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/server" "gitlab.com/gitlab-org/gitaly/internal/server/auth" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -131,6 +133,13 @@ func (srv *Server) RegisterServices(nm nodes.Manager, conf config.Config) { grpc_prometheus.Register(srv.s) } +func (srv *Server) RegisterMutatorMethods(nm nodes.Manager, datastore datastore.ReplJobsDatastore) { + repoServer := repository.NewServer(nm, datastore) + proxy.RegisterStreamHandlers(srv.s, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ + "WriteRef": repoServer.WriteRef, + }) +} + // Shutdown will attempt a graceful shutdown of the grpc server. If unable // to gracefully shutdown within the context deadline, it will then // forcefully shutdown the server and return a context cancellation error. diff --git a/internal/praefect/service/repository/server.go b/internal/praefect/service/repository/server.go new file mode 100644 index 000000000..3d8da7b3b --- /dev/null +++ b/internal/praefect/service/repository/server.go @@ -0,0 +1,18 @@ +package repository + +import ( + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" +) + +type Server struct { + nodeManager nodes.Manager + ds datastore.ReplJobsDatastore +} + +func NewServer(nodeMgr nodes.Manager, jobsDS datastore.ReplJobsDatastore) *Server { + return &Server{ + nodeManager: nodeMgr, + ds: jobsDS, + } +} diff --git a/internal/praefect/service/repository/write_ref.go b/internal/praefect/service/repository/write_ref.go new file mode 100644 index 000000000..455c44b03 --- /dev/null +++ b/internal/praefect/service/repository/write_ref.go @@ -0,0 +1,85 @@ +package repository + +import ( + "sync" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" +) + +func writeRefReqWithStorage(req gitalypb.WriteRefRequest, storage string) *gitalypb.WriteRefRequest { + req.Repository = &gitalypb.Repository{ + StorageName: storage, + RelativePath: req.GetRepository().GetRelativePath(), + } + return &req +} + +func (s *Server) WriteRef(srv interface{}, stream grpc.ServerStream) error { + var writeRefReq gitalypb.WriteRefRequest + + if err := stream.RecvMsg(&writeRefReq); err != nil { + return err + } + + shard, err := s.nodeManager.GetShard(writeRefReq.GetRepository().GetStorageName()) + if err != nil { + return err + } + + primary, err := shard.GetPrimary() + if err != nil { + return err + } + + secondaries, err := shard.GetSecondaries() + if err != nil { + return err + } + + if _, err := gitalypb.NewRepositoryServiceClient( + primary.GetConnection()).WriteRef(stream.Context(), + writeRefReqWithStorage(writeRefReq, primary.GetStorage()), + ); err != nil { + return err + } + + failedNodeStorages := make([]string, len(secondaries)) + var wg sync.WaitGroup + wg.Add(len(secondaries)) + + for i, secondary := range secondaries { + i := i + secondary := secondary + go func() { + defer wg.Done() + client := gitalypb.NewRepositoryServiceClient(secondary.GetConnection()) + + if _, err := client.WriteRef(stream.Context(), writeRefReqWithStorage(writeRefReq, secondary.GetStorage())); err != nil { + failedNodeStorages[i] = secondary.GetStorage() + } + }() + } + wg.Wait() + + var nodeStoragesToReplicate []string + for _, storage := range failedNodeStorages { + if storage != "" { + nodeStoragesToReplicate = append(nodeStoragesToReplicate, storage) + } + } + + jobIDs, err := s.ds.CreateReplicaReplJobs(writeRefReq.GetRepository().GetRelativePath(), primary.GetStorage(), nodeStoragesToReplicate, datastore.UpdateRepo, nil) + if err != nil { + return err + } + + for _, jobID := range jobIDs { + if err := s.ds.UpdateReplJobState(jobID, datastore.JobStateReady); err != nil { + return err + } + } + + return stream.SendMsg(&gitalypb.WriteRefResponse{}) +} |