Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/gitaly/transaction/manager_test.go')
-rw-r--r--internal/gitaly/transaction/manager_test.go271
1 files changed, 132 insertions, 139 deletions
diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go
index 4e4117ed0..0ed6773d0 100644
--- a/internal/gitaly/transaction/manager_test.go
+++ b/internal/gitaly/transaction/manager_test.go
@@ -1,4 +1,4 @@
-package transaction
+package transaction_test
import (
"context"
@@ -9,11 +9,15 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -38,156 +42,145 @@ func (s *testTransactionServer) StopTransaction(ctx context.Context, in *gitalyp
}
func TestPoolManager_Vote(t *testing.T) {
- testhelper.NewFeatureSets([]featureflag.FeatureFlag{
- featureflag.BackchannelVoting,
- }).Run(t, func(t *testing.T, ctx context.Context) {
- cfg := testcfg.Build(t)
-
- transactionServer, praefect, stop := runTransactionServer(t, cfg)
- defer stop()
-
- registry := backchannel.NewRegistry()
- if featureflag.IsEnabled(ctx, featureflag.BackchannelVoting) {
- backchannelConn, err := client.Dial(ctx, praefect.ListenAddr, nil, nil)
- require.NoError(t, err)
- defer backchannelConn.Close()
- praefect = metadata.PraefectServer{BackchannelID: registry.RegisterBackchannel(backchannelConn)}
- }
-
- manager := NewManager(cfg, registry)
-
- for _, tc := range []struct {
- desc string
- transaction metadata.Transaction
- vote Vote
- voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error)
- expectedErr error
- }{
- {
- desc: "successful vote",
- transaction: metadata.Transaction{
- ID: 1,
- Node: "node",
- },
- vote: VoteFromData([]byte("foobar")),
- voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
- require.Equal(t, uint64(1), request.TransactionId)
- require.Equal(t, "node", request.Node)
- require.Equal(t, request.ReferenceUpdatesHash, VoteFromData([]byte("foobar")).Bytes())
-
- return &gitalypb.VoteTransactionResponse{
- State: gitalypb.VoteTransactionResponse_COMMIT,
- }, nil
- },
+ cfg := testcfg.Build(t)
+
+ transactionServer, praefect := runTransactionServer(t, cfg)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ registry := backchannel.NewRegistry()
+ backchannelConn, err := client.Dial(ctx, praefect.ListenAddr, nil, nil)
+ require.NoError(t, err)
+ defer backchannelConn.Close()
+ praefect = txinfo.PraefectServer{BackchannelID: registry.RegisterBackchannel(backchannelConn)}
+
+ manager := transaction.NewManager(cfg, registry)
+
+ for _, tc := range []struct {
+ desc string
+ transaction txinfo.Transaction
+ vote voting.Vote
+ voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error)
+ expectedErr error
+ }{
+ {
+ desc: "successful vote",
+ transaction: txinfo.Transaction{
+ ID: 1,
+ Node: "node",
},
- {
- desc: "aborted vote",
- voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
- return &gitalypb.VoteTransactionResponse{
- State: gitalypb.VoteTransactionResponse_ABORT,
- }, nil
- },
- expectedErr: errors.New("transaction was aborted"),
+ vote: voting.VoteFromData([]byte("foobar")),
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ require.Equal(t, uint64(1), request.TransactionId)
+ require.Equal(t, "node", request.Node)
+ require.Equal(t, request.ReferenceUpdatesHash, voting.VoteFromData([]byte("foobar")).Bytes())
+
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_COMMIT,
+ }, nil
},
- {
- desc: "stopped vote",
- voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
- return &gitalypb.VoteTransactionResponse{
- State: gitalypb.VoteTransactionResponse_STOP,
- }, nil
- },
- expectedErr: errors.New("transaction was stopped"),
+ },
+ {
+ desc: "aborted vote",
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_ABORT,
+ }, nil
},
- {
- desc: "erroneous vote",
- voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
- return nil, status.Error(codes.Internal, "foobar")
- },
- expectedErr: status.Error(codes.Internal, "foobar"),
+ expectedErr: errors.New("transaction was aborted"),
+ },
+ {
+ desc: "stopped vote",
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_STOP,
+ }, nil
},
- } {
- t.Run(tc.desc, func(t *testing.T) {
- transactionServer.vote = func(request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
- return tc.voteFn(t, request)
- }
-
- err := manager.Vote(ctx, tc.transaction, praefect, tc.vote)
- require.Equal(t, tc.expectedErr, err)
- })
- }
- })
+ expectedErr: errors.New("transaction was stopped"),
+ },
+ {
+ desc: "erroneous vote",
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ return nil, status.Error(codes.Internal, "foobar")
+ },
+ expectedErr: status.Error(codes.Internal, "foobar"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transactionServer.vote = func(request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ return tc.voteFn(t, request)
+ }
+
+ err := manager.Vote(ctx, tc.transaction, praefect, tc.vote)
+ require.Equal(t, tc.expectedErr, err)
+ })
+ }
}
func TestPoolManager_Stop(t *testing.T) {
- testhelper.NewFeatureSets([]featureflag.FeatureFlag{
- featureflag.BackchannelVoting,
- }).Run(t, func(t *testing.T, ctx context.Context) {
- cfg := testcfg.Build(t)
-
- transactionServer, praefect, stop := runTransactionServer(t, cfg)
- defer stop()
-
- registry := backchannel.NewRegistry()
- if featureflag.IsEnabled(ctx, featureflag.BackchannelVoting) {
- backchannelConn, err := client.Dial(ctx, praefect.ListenAddr, nil, nil)
- require.NoError(t, err)
- defer backchannelConn.Close()
- praefect = metadata.PraefectServer{BackchannelID: registry.RegisterBackchannel(backchannelConn)}
- }
-
- manager := NewManager(cfg, registry)
-
- for _, tc := range []struct {
- desc string
- transaction metadata.Transaction
- stopFn func(*testing.T, *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error)
- expectedErr error
- }{
- {
- desc: "successful stop",
- transaction: metadata.Transaction{
- ID: 1,
- Node: "node",
- },
- stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
- require.Equal(t, uint64(1), request.TransactionId)
- return &gitalypb.StopTransactionResponse{}, nil
- },
+ cfg := testcfg.Build(t)
+
+ transactionServer, praefect := runTransactionServer(t, cfg)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ registry := backchannel.NewRegistry()
+ backchannelConn, err := client.Dial(ctx, praefect.ListenAddr, nil, nil)
+ require.NoError(t, err)
+ defer backchannelConn.Close()
+ praefect = txinfo.PraefectServer{BackchannelID: registry.RegisterBackchannel(backchannelConn)}
+
+ manager := transaction.NewManager(cfg, registry)
+
+ for _, tc := range []struct {
+ desc string
+ transaction txinfo.Transaction
+ stopFn func(*testing.T, *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error)
+ expectedErr error
+ }{
+ {
+ desc: "successful stop",
+ transaction: txinfo.Transaction{
+ ID: 1,
+ Node: "node",
+ },
+ stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
+ require.Equal(t, uint64(1), request.TransactionId)
+ return &gitalypb.StopTransactionResponse{}, nil
},
- {
- desc: "erroneous stop",
- stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
- return nil, status.Error(codes.Internal, "foobar")
- },
- expectedErr: status.Error(codes.Internal, "foobar"),
+ },
+ {
+ desc: "erroneous stop",
+ stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
+ return nil, status.Error(codes.Internal, "foobar")
},
- } {
- t.Run(tc.desc, func(t *testing.T) {
- transactionServer.stop = func(request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
- return tc.stopFn(t, request)
- }
-
- err := manager.Stop(ctx, tc.transaction, praefect)
- require.Equal(t, tc.expectedErr, err)
- })
- }
- })
+ expectedErr: status.Error(codes.Internal, "foobar"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transactionServer.stop = func(request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
+ return tc.stopFn(t, request)
+ }
+
+ err := manager.Stop(ctx, tc.transaction, praefect)
+ require.Equal(t, tc.expectedErr, err)
+ })
+ }
}
-func runTransactionServer(t *testing.T, cfg config.Cfg) (*testTransactionServer, metadata.PraefectServer, func()) {
+func runTransactionServer(t *testing.T, cfg config.Cfg) (*testTransactionServer, txinfo.PraefectServer) {
transactionServer := &testTransactionServer{}
+ cfg.ListenAddr = ":0" // pushes gRPC to listen on the TCP address
+ addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRefTransactionServer(srv, transactionServer)
+ }, testserver.WithDisablePraefect())
- server := testhelper.NewServerWithAuth(t, nil, nil, cfg.Auth.Token, backchannel.NewRegistry(), testhelper.WithInternalSocket(cfg))
- gitalypb.RegisterRefTransactionServer(server.GrpcServer(), transactionServer)
- server.Start(t)
-
- listener, address := testhelper.GetLocalhostListener(t)
- go func() { require.NoError(t, server.GrpcServer().Serve(listener)) }()
-
- praefect := metadata.PraefectServer{
- ListenAddr: "tcp://" + address,
+ praefect := txinfo.PraefectServer{
+ ListenAddr: addr,
Token: cfg.Auth.Token,
}
- return transactionServer, praefect, server.Stop
+ return transactionServer, praefect
}