From 939b9c327ff85177f026ecaa8a44bfaaec5ed4e3 Mon Sep 17 00:00:00 2001 From: John Cai Date: Fri, 28 Feb 2020 14:39:02 -0800 Subject: if commit fails, try to recover by replicating --- .../middleware/repositoryhandler/transactions.go | 4 +- internal/praefect/coordinator.go | 55 +++++++++++++++------- internal/praefect/helper_test.go | 2 +- internal/praefect/replicator_test.go | 2 +- internal/service/repository/testhelper_test.go | 2 +- internal/service/repository/write_ref_tx.go | 1 - 6 files changed, 43 insertions(+), 23 deletions(-) diff --git a/internal/middleware/repositoryhandler/transactions.go b/internal/middleware/repositoryhandler/transactions.go index fe8e1ba91..273d7466c 100644 --- a/internal/middleware/repositoryhandler/transactions.go +++ b/internal/middleware/repositoryhandler/transactions.go @@ -6,9 +6,8 @@ import ( "fmt" "reflect" - "github.com/sirupsen/logrus" - "github.com/google/uuid" + "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/git/repository" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -43,7 +42,6 @@ func RepositoryTransactionUnaryInterceptor(transactions *repository.TransactionM return nil, errors.New("not a repository request") } if repoReq.GetRepository() != nil { - logrus.WithField("repository", repoReq.GetRepository()).Info("trying to start new transaction") transactions.NewTransaction(transactionID, repoReq.GetRepository()) logrus.WithField("repository", repoReq.GetRepository()).Info("started new transaction") diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 09c4458c1..fd3218545 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -11,6 +11,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" @@ -37,8 +38,9 @@ type Coordinator struct { datastore datastore.Datastore - registry *protoregistry.Registry - conf config.Config + registry *protoregistry.Registry + conf config.Config + replicator Replicator } // NewCoordinator returns a new Coordinator that utilizes the provided logger @@ -55,6 +57,10 @@ func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, nodeMgr nodes.Manag } } +func (c *Coordinator) SetReplicator(replicator Replicator) { + c.replicator = replicator +} + // RegisterProtos allows coordinator to register new protos on the fly func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) error { return c.registry.RegisterFiles(protos...) @@ -148,7 +154,6 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, } func (c *Coordinator) HandleWriteRef(srv interface{}, serverStream grpc.ServerStream) error { - c.log.Info("I'm in Handle write rEFFFFFFFFFFFFFFFFFFFFFF!!!!!!!!!!!!!!") var writeRefReq gitalypb.WriteRefRequest if err := serverStream.RecvMsg(&writeRefReq); err != nil { @@ -194,6 +199,7 @@ func (c *Coordinator) HandleWriteRef(srv interface{}, serverStream grpc.ServerSt Force: writeRefReq.Force, }, grpc.Trailer(&trailer)); err != nil { errs = append(errs, err) + continue } if len(trailer.Get("transaction_id")) == 0 { @@ -235,26 +241,43 @@ func (c *Coordinator) HandleWriteRef(srv interface{}, serverStream grpc.ServerSt for transactionID, node := range transactionIDs { client := gitalypb.NewRepositoryServiceClient(node.GetConnection()) if _, err = client.WriteRefTx(metadata.AppendToOutgoingContext(ctx, "transaction_step", "commit", "transaction_id", transactionID), &gitalypb.WriteRefTxRequest{}); err != nil { - return err + if node == primary { + return errors.New("primary write failed") + } + + injectedCtx, err := helper.InjectGitalyServers(metadata.AppendToOutgoingContext(ctx, "transaction_step", "recover", "transaction_id", transactionID), primary.GetStorage(), primary.GetAddress(), primary.GetToken()) + if err != nil { + return err + } + client.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: node.GetStorage(), + RelativePath: writeRefReq.GetRepository().GetRelativePath(), + }, + Source: &gitalypb.Repository{ + StorageName: primary.GetStorage(), + RelativePath: writeRefReq.GetRepository().GetRelativePath(), + }, + }) } } - - return nil } + /* - if len(errs) == 0 { - return nil - } + if len(errs) == 0 { + return nil + } - c.log.Info("about to rollback") - // rollback - for _, node := range transactionIDs { - client := gitalypb.NewRepositoryServiceClient(node.GetConnection()) - if _, err = client.WriteRefTx(metadata.AppendToOutgoingContext(ctx, "transaction_step", "rollback"), &gitalypb.WriteRefTxRequest{}); err != nil { - return err + c.log.Info("about to rollback") + // rollback + for _, node := range transactionIDs { + client := gitalypb.NewRepositoryServiceClient(node.GetConnection()) + if _, err = client.WriteRefTx(metadata.AppendToOutgoingContext(ctx, "transaction_step", "rollback"), &gitalypb.WriteRefTxRequest{}); err != nil { + return err + } } - } + */ return nil } diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index e6b955171..2c30be1d6 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -254,7 +254,7 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string, require.NoError(t, rubyServer.Start()) gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer()) - gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(repo.NewTransactions(), rubyServer)) + gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(repo.NewTransactionManager(), rubyServer)) healthpb.RegisterHealthServer(server, health.NewServer()) errQ := make(chan error) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index f9b039248..9f3d77be0 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -450,7 +450,7 @@ func newReplicationService(tb testing.TB) (*grpc.Server, string) { svr := testhelper.NewTestGrpcServer(tb, nil, nil) - gitalypb.RegisterRepositoryServiceServer(svr, repository.NewServer(repo.NewTransactions(), &rubyserver.Server{})) + gitalypb.RegisterRepositoryServiceServer(svr, repository.NewServer(repo.NewTransactionManager(), &rubyserver.Server{})) gitalypb.RegisterObjectPoolServiceServer(svr, objectpoolservice.NewServer()) gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(&rubyserver.Server{})) reflection.Register(svr) diff --git a/internal/service/repository/testhelper_test.go b/internal/service/repository/testhelper_test.go index 2c0b43c4c..b2086aea4 100644 --- a/internal/service/repository/testhelper_test.go +++ b/internal/service/repository/testhelper_test.go @@ -85,7 +85,7 @@ func runRepoServer(t *testing.T) (*grpc.Server, string) { t.Fatal(err) } - gitalypb.RegisterRepositoryServiceServer(server, NewServer(repo.NewTransactions(), RubyServer)) + gitalypb.RegisterRepositoryServiceServer(server, NewServer(repo.NewTransactionManager(), RubyServer)) reflection.Register(server) go server.Serve(listener) diff --git a/internal/service/repository/write_ref_tx.go b/internal/service/repository/write_ref_tx.go index 457914e97..f146e92aa 100644 --- a/internal/service/repository/write_ref_tx.go +++ b/internal/service/repository/write_ref_tx.go @@ -7,7 +7,6 @@ import ( "os/exec" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/helper/text" -- cgit v1.2.3