Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-02-29 01:39:02 +0300
committerJohn Cai <jcai@gitlab.com>2020-03-03 22:26:54 +0300
commit939b9c327ff85177f026ecaa8a44bfaaec5ed4e3 (patch)
treec2387e21c0eb7cc49d05d188ea4a7b61d2fc3dbe
parent6da6bc59fcc61e71aa96c4159549f5ca28bdf9e6 (diff)
if commit fails, try to recover by replicatingjc-3phase-writeref
-rw-r--r--internal/middleware/repositoryhandler/transactions.go4
-rw-r--r--internal/praefect/coordinator.go55
-rw-r--r--internal/praefect/helper_test.go2
-rw-r--r--internal/praefect/replicator_test.go2
-rw-r--r--internal/service/repository/testhelper_test.go2
-rw-r--r--internal/service/repository/write_ref_tx.go1
6 files changed, 43 insertions, 23 deletions
diff --git a/internal/middleware/repositoryhandler/transactions.go b/internal/middleware/repositoryhandler/transactions.go
index fe8e1ba91..273d7466c 100644
--- a/internal/middleware/repositoryhandler/transactions.go
+++ b/internal/middleware/repositoryhandler/transactions.go
@@ -6,9 +6,8 @@ import (
"fmt"
"reflect"
- "github.com/sirupsen/logrus"
-
"github.com/google/uuid"
+ "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -43,7 +42,6 @@ func RepositoryTransactionUnaryInterceptor(transactions *repository.TransactionM
return nil, errors.New("not a repository request")
}
if repoReq.GetRepository() != nil {
-
logrus.WithField("repository", repoReq.GetRepository()).Info("trying to start new transaction")
transactions.NewTransaction(transactionID, repoReq.GetRepository())
logrus.WithField("repository", repoReq.GetRepository()).Info("started new transaction")
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 09c4458c1..fd3218545 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -11,6 +11,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
@@ -37,8 +38,9 @@ type Coordinator struct {
datastore datastore.Datastore
- registry *protoregistry.Registry
- conf config.Config
+ registry *protoregistry.Registry
+ conf config.Config
+ replicator Replicator
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
@@ -55,6 +57,10 @@ func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, nodeMgr nodes.Manag
}
}
+func (c *Coordinator) SetReplicator(replicator Replicator) {
+ c.replicator = replicator
+}
+
// RegisterProtos allows coordinator to register new protos on the fly
func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) error {
return c.registry.RegisterFiles(protos...)
@@ -148,7 +154,6 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
}
func (c *Coordinator) HandleWriteRef(srv interface{}, serverStream grpc.ServerStream) error {
- c.log.Info("I'm in Handle write rEFFFFFFFFFFFFFFFFFFFFFF!!!!!!!!!!!!!!")
var writeRefReq gitalypb.WriteRefRequest
if err := serverStream.RecvMsg(&writeRefReq); err != nil {
@@ -194,6 +199,7 @@ func (c *Coordinator) HandleWriteRef(srv interface{}, serverStream grpc.ServerSt
Force: writeRefReq.Force,
}, grpc.Trailer(&trailer)); err != nil {
errs = append(errs, err)
+ continue
}
if len(trailer.Get("transaction_id")) == 0 {
@@ -235,26 +241,43 @@ func (c *Coordinator) HandleWriteRef(srv interface{}, serverStream grpc.ServerSt
for transactionID, node := range transactionIDs {
client := gitalypb.NewRepositoryServiceClient(node.GetConnection())
if _, err = client.WriteRefTx(metadata.AppendToOutgoingContext(ctx, "transaction_step", "commit", "transaction_id", transactionID), &gitalypb.WriteRefTxRequest{}); err != nil {
- return err
+ if node == primary {
+ return errors.New("primary write failed")
+ }
+
+ injectedCtx, err := helper.InjectGitalyServers(metadata.AppendToOutgoingContext(ctx, "transaction_step", "recover", "transaction_id", transactionID), primary.GetStorage(), primary.GetAddress(), primary.GetToken())
+ if err != nil {
+ return err
+ }
+ client.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: node.GetStorage(),
+ RelativePath: writeRefReq.GetRepository().GetRelativePath(),
+ },
+ Source: &gitalypb.Repository{
+ StorageName: primary.GetStorage(),
+ RelativePath: writeRefReq.GetRepository().GetRelativePath(),
+ },
+ })
}
}
-
- return nil
}
+ /*
- if len(errs) == 0 {
- return nil
- }
+ if len(errs) == 0 {
+ return nil
+ }
- c.log.Info("about to rollback")
- // rollback
- for _, node := range transactionIDs {
- client := gitalypb.NewRepositoryServiceClient(node.GetConnection())
- if _, err = client.WriteRefTx(metadata.AppendToOutgoingContext(ctx, "transaction_step", "rollback"), &gitalypb.WriteRefTxRequest{}); err != nil {
- return err
+ c.log.Info("about to rollback")
+ // rollback
+ for _, node := range transactionIDs {
+ client := gitalypb.NewRepositoryServiceClient(node.GetConnection())
+ if _, err = client.WriteRefTx(metadata.AppendToOutgoingContext(ctx, "transaction_step", "rollback"), &gitalypb.WriteRefTxRequest{}); err != nil {
+ return err
+ }
}
- }
+ */
return nil
}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index e6b955171..2c30be1d6 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -254,7 +254,7 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string,
require.NoError(t, rubyServer.Start())
gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer())
- gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(repo.NewTransactions(), rubyServer))
+ gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(repo.NewTransactionManager(), rubyServer))
healthpb.RegisterHealthServer(server, health.NewServer())
errQ := make(chan error)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index f9b039248..9f3d77be0 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -450,7 +450,7 @@ func newReplicationService(tb testing.TB) (*grpc.Server, string) {
svr := testhelper.NewTestGrpcServer(tb, nil, nil)
- gitalypb.RegisterRepositoryServiceServer(svr, repository.NewServer(repo.NewTransactions(), &rubyserver.Server{}))
+ gitalypb.RegisterRepositoryServiceServer(svr, repository.NewServer(repo.NewTransactionManager(), &rubyserver.Server{}))
gitalypb.RegisterObjectPoolServiceServer(svr, objectpoolservice.NewServer())
gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(&rubyserver.Server{}))
reflection.Register(svr)
diff --git a/internal/service/repository/testhelper_test.go b/internal/service/repository/testhelper_test.go
index 2c0b43c4c..b2086aea4 100644
--- a/internal/service/repository/testhelper_test.go
+++ b/internal/service/repository/testhelper_test.go
@@ -85,7 +85,7 @@ func runRepoServer(t *testing.T) (*grpc.Server, string) {
t.Fatal(err)
}
- gitalypb.RegisterRepositoryServiceServer(server, NewServer(repo.NewTransactions(), RubyServer))
+ gitalypb.RegisterRepositoryServiceServer(server, NewServer(repo.NewTransactionManager(), RubyServer))
reflection.Register(server)
go server.Serve(listener)
diff --git a/internal/service/repository/write_ref_tx.go b/internal/service/repository/write_ref_tx.go
index 457914e97..f146e92aa 100644
--- a/internal/service/repository/write_ref_tx.go
+++ b/internal/service/repository/write_ref_tx.go
@@ -7,7 +7,6 @@ import (
"os/exec"
"github.com/sirupsen/logrus"
-
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper/text"