Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2021-01-27 11:25:35 +0300
committerPaul Okstad <pokstad@gitlab.com>2021-03-12 20:04:19 +0300
commit81d7f8d27a5b7c0538ab250727c5dfef81205960 (patch)
treed21bd905b8f5b5b08b87f3cedfc51da0717de425
parente41a98b009322b03bbc7c87cf57a02adfec8f4f3 (diff)
Test Gitaly transaction service with coordinator
-rw-r--r--internal/praefect/coordinator_test.go116
1 files changed, 115 insertions, 1 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index dc4dd77bc..3ab5e5ffa 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -13,11 +13,13 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/git/gittest"
+ gtransaction "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/transaction"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
@@ -276,7 +278,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
},
}
- txMgr := transactions.NewManager(conf)
+ routeUUID := uuid.New()
+ txMgr := transactions.NewManager(conf, transactions.WithRouteUUID(routeUUID))
coordinator := NewCoordinator(
datastore.NewMemoryReplicationEventQueue(conf),
@@ -340,6 +343,117 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
err = streamParams.RequestFinalizer()
require.NoError(t, err)
+
+ t.Run("with Gitaly transaction service enabled", func(t *testing.T) {
+ ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.GitalyTxSvc)
+
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+
+ // register a gitaly transaction service for each node
+ gitalyClients := map[string]gitalypb.RefTransactionClient{}
+ connSet := map[string]map[string]*grpc.ClientConn{}
+ for _, vs := range conf.VirtualStorages {
+ connSet[vs.Name] = map[string]*grpc.ClientConn{}
+ for _, node := range vs.Nodes {
+ srv := testhelper.NewServer(t, nil, nil)
+ gitalypb.RegisterRefTransactionServer(
+ srv.GrpcServer(), gtransaction.NewServer())
+
+ srv.Start(t)
+ defer srv.Stop()
+
+ addr := "unix://" + srv.Socket()
+
+ cc, err := client.Dial(addr, nil)
+ require.NoError(t, err)
+ defer cc.Close()
+
+ node.Address = addr
+ gitalyClients[node.Storage] = gitalypb.NewRefTransactionClient(cc)
+ connSet[vs.Name][node.Storage] = cc
+ }
+ }
+
+ join := txMgr.StartRoutingVotes(ctx, connSet)
+ defer join()
+
+ // wait until the routes are up
+
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, streamParams.RequestFinalizer()) }()
+
+ transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx)
+ require.NoError(t, err)
+
+ var wg sync.WaitGroup
+ var syncWG sync.WaitGroup
+
+ wg.Add(2)
+ defer wg.Wait()
+
+ syncWG.Add(2)
+
+ stopped := make(chan struct{}) // tells us when the transaction has been stopped
+ go func() {
+ defer wg.Done()
+
+ vote := sha1.Sum([]byte("vote"))
+
+ req := &gitalypb.VoteTransactionRequest{
+ Repository: &repo,
+ TransactionId: transaction.ID,
+ Node: "primary",
+ ReferenceUpdatesHash: vote[:],
+ RouteUuid: routeUUID.String(),
+ }
+ resp, err := gitalyClients["primary"].VoteTransaction(ctx, req)
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, resp.State)
+
+ // Assure that at least one vote was agreed on.
+ syncWG.Done()
+ syncWG.Wait()
+
+ stopReq := &gitalypb.StopTransactionRequest{
+ Repository: &repo,
+ TransactionId: transaction.ID,
+ RouteUuid: routeUUID.String(),
+ }
+ _, err = gitalyClients["primary"].StopTransaction(ctx, stopReq)
+ require.NoError(t, err)
+ close(stopped)
+ }()
+
+ go func() {
+ defer wg.Done()
+
+ vote := sha1.Sum([]byte("vote"))
+
+ req := &gitalypb.VoteTransactionRequest{
+ Repository: &repo,
+ TransactionId: transaction.ID,
+ Node: "secondary",
+ ReferenceUpdatesHash: vote[:],
+ RouteUuid: routeUUID.String(),
+ }
+ resp, err := gitalyClients["secondary"].VoteTransaction(ctx, req)
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, resp.State)
+
+ // Assure that at least one vote was agreed on.
+ syncWG.Done()
+ syncWG.Wait()
+
+ <-stopped
+ resp, err = gitalyClients["secondary"].VoteTransaction(ctx, req)
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, resp.State)
+
+ cancel() // test is done, stop routing votes
+ }()
+ })
}
func TestStreamDirectorAccessor(t *testing.T) {