diff options
author | Paul Okstad <pokstad@gitlab.com> | 2021-01-27 11:25:35 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2021-03-12 20:04:19 +0300 |
commit | 81d7f8d27a5b7c0538ab250727c5dfef81205960 (patch) | |
tree | d21bd905b8f5b5b08b87f3cedfc51da0717de425 | |
parent | e41a98b009322b03bbc7c87cf57a02adfec8f4f3 (diff) |
Test Gitaly transaction service with coordinator
-rw-r--r-- | internal/praefect/coordinator_test.go | 116 |
1 files changed, 115 insertions, 1 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index dc4dd77bc..3ab5e5ffa 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -13,11 +13,13 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" + "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/git/gittest" + gtransaction "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/transaction" "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" @@ -276,7 +278,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { }, } - txMgr := transactions.NewManager(conf) + routeUUID := uuid.New() + txMgr := transactions.NewManager(conf, transactions.WithRouteUUID(routeUUID)) coordinator := NewCoordinator( datastore.NewMemoryReplicationEventQueue(conf), @@ -340,6 +343,117 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { err = streamParams.RequestFinalizer() require.NoError(t, err) + + t.Run("with Gitaly transaction service enabled", func(t *testing.T) { + ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.GitalyTxSvc) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // register a gitaly transaction service for each node + gitalyClients := map[string]gitalypb.RefTransactionClient{} + connSet := map[string]map[string]*grpc.ClientConn{} + for _, vs := range conf.VirtualStorages { + connSet[vs.Name] = map[string]*grpc.ClientConn{} + for _, node := range vs.Nodes { + srv := testhelper.NewServer(t, nil, nil) + gitalypb.RegisterRefTransactionServer( + srv.GrpcServer(), gtransaction.NewServer()) + + srv.Start(t) + defer srv.Stop() + + addr := "unix://" + srv.Socket() + + cc, err := client.Dial(addr, nil) + require.NoError(t, err) + defer cc.Close() + + node.Address = addr + gitalyClients[node.Storage] = gitalypb.NewRefTransactionClient(cc) + connSet[vs.Name][node.Storage] = cc + } + } + + join := txMgr.StartRoutingVotes(ctx, connSet) + defer join() + + // wait until the routes are up + + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + defer func() { require.NoError(t, streamParams.RequestFinalizer()) }() + + transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + var syncWG sync.WaitGroup + + wg.Add(2) + defer wg.Wait() + + syncWG.Add(2) + + stopped := make(chan struct{}) // tells us when the transaction has been stopped + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + + req := &gitalypb.VoteTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + Node: "primary", + ReferenceUpdatesHash: vote[:], + RouteUuid: routeUUID.String(), + } + resp, err := gitalyClients["primary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, resp.State) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + stopReq := &gitalypb.StopTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + RouteUuid: routeUUID.String(), + } + _, err = gitalyClients["primary"].StopTransaction(ctx, stopReq) + require.NoError(t, err) + close(stopped) + }() + + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + + req := &gitalypb.VoteTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + Node: "secondary", + ReferenceUpdatesHash: vote[:], + RouteUuid: routeUUID.String(), + } + resp, err := gitalyClients["secondary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, resp.State) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + <-stopped + resp, err = gitalyClients["secondary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, resp.State) + + cancel() // test is done, stop routing votes + }() + }) } func TestStreamDirectorAccessor(t *testing.T) { |