diff options
Diffstat (limited to 'internal/gitaly/transaction/manager_test.go')
-rw-r--r-- | internal/gitaly/transaction/manager_test.go | 271 |
1 files changed, 132 insertions, 139 deletions
diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go index 4e4117ed0..0ed6773d0 100644 --- a/internal/gitaly/transaction/manager_test.go +++ b/internal/gitaly/transaction/manager_test.go @@ -1,4 +1,4 @@ -package transaction +package transaction_test import ( "context" @@ -9,11 +9,15 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/backchannel" "gitlab.com/gitlab-org/gitaly/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" - "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -38,156 +42,145 @@ func (s *testTransactionServer) StopTransaction(ctx context.Context, in *gitalyp } func TestPoolManager_Vote(t *testing.T) { - testhelper.NewFeatureSets([]featureflag.FeatureFlag{ - featureflag.BackchannelVoting, - }).Run(t, func(t *testing.T, ctx context.Context) { - cfg := testcfg.Build(t) - - transactionServer, praefect, stop := runTransactionServer(t, cfg) - defer stop() - - registry := backchannel.NewRegistry() - if featureflag.IsEnabled(ctx, featureflag.BackchannelVoting) { - backchannelConn, err := client.Dial(ctx, praefect.ListenAddr, nil, nil) - require.NoError(t, err) - defer backchannelConn.Close() - praefect = metadata.PraefectServer{BackchannelID: registry.RegisterBackchannel(backchannelConn)} - } - - manager := NewManager(cfg, registry) - - for _, tc := range []struct { - desc string - transaction metadata.Transaction - vote Vote - voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) - expectedErr error - }{ - { - desc: "successful vote", - transaction: metadata.Transaction{ - ID: 1, - Node: "node", - }, - vote: VoteFromData([]byte("foobar")), - voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { - require.Equal(t, uint64(1), request.TransactionId) - require.Equal(t, "node", request.Node) - require.Equal(t, request.ReferenceUpdatesHash, VoteFromData([]byte("foobar")).Bytes()) - - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_COMMIT, - }, nil - }, + cfg := testcfg.Build(t) + + transactionServer, praefect := runTransactionServer(t, cfg) + + ctx, cancel := testhelper.Context() + defer cancel() + + registry := backchannel.NewRegistry() + backchannelConn, err := client.Dial(ctx, praefect.ListenAddr, nil, nil) + require.NoError(t, err) + defer backchannelConn.Close() + praefect = txinfo.PraefectServer{BackchannelID: registry.RegisterBackchannel(backchannelConn)} + + manager := transaction.NewManager(cfg, registry) + + for _, tc := range []struct { + desc string + transaction txinfo.Transaction + vote voting.Vote + voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) + expectedErr error + }{ + { + desc: "successful vote", + transaction: txinfo.Transaction{ + ID: 1, + Node: "node", }, - { - desc: "aborted vote", - voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_ABORT, - }, nil - }, - expectedErr: errors.New("transaction was aborted"), + vote: voting.VoteFromData([]byte("foobar")), + voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + require.Equal(t, uint64(1), request.TransactionId) + require.Equal(t, "node", request.Node) + require.Equal(t, request.ReferenceUpdatesHash, voting.VoteFromData([]byte("foobar")).Bytes()) + + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_COMMIT, + }, nil }, - { - desc: "stopped vote", - voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_STOP, - }, nil - }, - expectedErr: errors.New("transaction was stopped"), + }, + { + desc: "aborted vote", + voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_ABORT, + }, nil }, - { - desc: "erroneous vote", - voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { - return nil, status.Error(codes.Internal, "foobar") - }, - expectedErr: status.Error(codes.Internal, "foobar"), + expectedErr: errors.New("transaction was aborted"), + }, + { + desc: "stopped vote", + voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_STOP, + }, nil }, - } { - t.Run(tc.desc, func(t *testing.T) { - transactionServer.vote = func(request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { - return tc.voteFn(t, request) - } - - err := manager.Vote(ctx, tc.transaction, praefect, tc.vote) - require.Equal(t, tc.expectedErr, err) - }) - } - }) + expectedErr: errors.New("transaction was stopped"), + }, + { + desc: "erroneous vote", + voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + return nil, status.Error(codes.Internal, "foobar") + }, + expectedErr: status.Error(codes.Internal, "foobar"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transactionServer.vote = func(request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + return tc.voteFn(t, request) + } + + err := manager.Vote(ctx, tc.transaction, praefect, tc.vote) + require.Equal(t, tc.expectedErr, err) + }) + } } func TestPoolManager_Stop(t *testing.T) { - testhelper.NewFeatureSets([]featureflag.FeatureFlag{ - featureflag.BackchannelVoting, - }).Run(t, func(t *testing.T, ctx context.Context) { - cfg := testcfg.Build(t) - - transactionServer, praefect, stop := runTransactionServer(t, cfg) - defer stop() - - registry := backchannel.NewRegistry() - if featureflag.IsEnabled(ctx, featureflag.BackchannelVoting) { - backchannelConn, err := client.Dial(ctx, praefect.ListenAddr, nil, nil) - require.NoError(t, err) - defer backchannelConn.Close() - praefect = metadata.PraefectServer{BackchannelID: registry.RegisterBackchannel(backchannelConn)} - } - - manager := NewManager(cfg, registry) - - for _, tc := range []struct { - desc string - transaction metadata.Transaction - stopFn func(*testing.T, *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) - expectedErr error - }{ - { - desc: "successful stop", - transaction: metadata.Transaction{ - ID: 1, - Node: "node", - }, - stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { - require.Equal(t, uint64(1), request.TransactionId) - return &gitalypb.StopTransactionResponse{}, nil - }, + cfg := testcfg.Build(t) + + transactionServer, praefect := runTransactionServer(t, cfg) + + ctx, cancel := testhelper.Context() + defer cancel() + + registry := backchannel.NewRegistry() + backchannelConn, err := client.Dial(ctx, praefect.ListenAddr, nil, nil) + require.NoError(t, err) + defer backchannelConn.Close() + praefect = txinfo.PraefectServer{BackchannelID: registry.RegisterBackchannel(backchannelConn)} + + manager := transaction.NewManager(cfg, registry) + + for _, tc := range []struct { + desc string + transaction txinfo.Transaction + stopFn func(*testing.T, *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) + expectedErr error + }{ + { + desc: "successful stop", + transaction: txinfo.Transaction{ + ID: 1, + Node: "node", + }, + stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + require.Equal(t, uint64(1), request.TransactionId) + return &gitalypb.StopTransactionResponse{}, nil }, - { - desc: "erroneous stop", - stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { - return nil, status.Error(codes.Internal, "foobar") - }, - expectedErr: status.Error(codes.Internal, "foobar"), + }, + { + desc: "erroneous stop", + stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + return nil, status.Error(codes.Internal, "foobar") }, - } { - t.Run(tc.desc, func(t *testing.T) { - transactionServer.stop = func(request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { - return tc.stopFn(t, request) - } - - err := manager.Stop(ctx, tc.transaction, praefect) - require.Equal(t, tc.expectedErr, err) - }) - } - }) + expectedErr: status.Error(codes.Internal, "foobar"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transactionServer.stop = func(request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + return tc.stopFn(t, request) + } + + err := manager.Stop(ctx, tc.transaction, praefect) + require.Equal(t, tc.expectedErr, err) + }) + } } -func runTransactionServer(t *testing.T, cfg config.Cfg) (*testTransactionServer, metadata.PraefectServer, func()) { +func runTransactionServer(t *testing.T, cfg config.Cfg) (*testTransactionServer, txinfo.PraefectServer) { transactionServer := &testTransactionServer{} + cfg.ListenAddr = ":0" // pushes gRPC to listen on the TCP address + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRefTransactionServer(srv, transactionServer) + }, testserver.WithDisablePraefect()) - server := testhelper.NewServerWithAuth(t, nil, nil, cfg.Auth.Token, backchannel.NewRegistry(), testhelper.WithInternalSocket(cfg)) - gitalypb.RegisterRefTransactionServer(server.GrpcServer(), transactionServer) - server.Start(t) - - listener, address := testhelper.GetLocalhostListener(t) - go func() { require.NoError(t, server.GrpcServer().Serve(listener)) }() - - praefect := metadata.PraefectServer{ - ListenAddr: "tcp://" + address, + praefect := txinfo.PraefectServer{ + ListenAddr: addr, Token: cfg.Auth.Token, } - return transactionServer, praefect, server.Stop + return transactionServer, praefect } |