diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-01-28 10:28:44 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-01-29 17:00:49 +0300 |
commit | 4b74f048231c8d3614343f9b760513e5d2fb3252 (patch) | |
tree | 6087265499992f74da3bd6b52ec7bf2eadacaacd | |
parent | 145dde08af8b6081de21a170acc15846a15507b9 (diff) |
hook: Extract transaction handling into standalone manager
The hook manager currently hosts all logic to handle transactional
voting. This made sense until now as the reference-transaction hook was
the only place where we would do voting on transactions in the first
place. But we're about to extend transactional support to more places
which are not related to references at all.
As a preparatory step, this commit thus extracts the transaction
handling code into its own transaction manager. To keep the diff small
and focussed on the functional change, the manager is still created
ad-hoc in the hook manager. This is about to change with a subsequent
commit.
-rw-r--r-- | internal/gitaly/hook/manager.go | 24 | ||||
-rw-r--r-- | internal/gitaly/hook/transactions.go | 60 | ||||
-rw-r--r-- | internal/gitaly/transaction/manager.go | 115 | ||||
-rw-r--r-- | internal/gitaly/transaction/manager_test.go | 167 | ||||
-rw-r--r-- | internal/gitaly/transaction/testhelper_test.go | 21 |
5 files changed, 320 insertions, 67 deletions
diff --git a/internal/gitaly/hook/manager.go b/internal/gitaly/hook/manager.go index 77ac000bb..9b67024e9 100644 --- a/internal/gitaly/hook/manager.go +++ b/internal/gitaly/hook/manager.go @@ -5,8 +5,8 @@ import ( "io" "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -49,11 +49,10 @@ type Manager interface { // GitLabHookManager is a hook manager containing Git hook business logic. It // uses the GitLab API to authenticate and track ongoing hook calls. type GitLabHookManager struct { - locator storage.Locator - gitlabAPI GitlabAPI - hooksConfig config.Hooks - conns *client.Pool - votingDelayMetric prometheus.Histogram + locator storage.Locator + gitlabAPI GitlabAPI + hooksConfig config.Hooks + txManager *transaction.PoolManager } // NewManager returns a new hook manager @@ -62,21 +61,14 @@ func NewManager(locator storage.Locator, gitlabAPI GitlabAPI, cfg config.Cfg) *G locator: locator, gitlabAPI: gitlabAPI, hooksConfig: cfg.Hooks, - conns: client.NewPoolWithOptions(client.WithDialOptions(client.FailOnNonTempDialError()...)), - votingDelayMetric: prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "gitaly_hook_transaction_voting_delay_seconds", - Help: "Delay between calling out to transaction service and receiving a response", - Buckets: cfg.Prometheus.GRPCLatencyBuckets, - }, - ), + txManager: transaction.NewManager(cfg), } } func (m *GitLabHookManager) Describe(descs chan<- *prometheus.Desc) { - prometheus.DescribeByCollect(m, descs) + prometheus.DescribeByCollect(m.txManager, descs) } func (m *GitLabHookManager) Collect(metrics chan<- prometheus.Metric) { - m.votingDelayMetric.Collect(metrics) + m.txManager.Collect(metrics) } diff --git a/internal/gitaly/hook/transactions.go b/internal/gitaly/hook/transactions.go index e12111b0d..9e7ef7324 100644 --- a/internal/gitaly/hook/transactions.go +++ b/internal/gitaly/hook/transactions.go @@ -5,11 +5,8 @@ import ( "errors" "time" - "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" - "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" ) func isPrimary(payload git.HooksPayload) bool { @@ -19,16 +16,8 @@ func isPrimary(payload git.HooksPayload) bool { return payload.Transaction.Primary } -func (m *GitLabHookManager) getPraefectConn(ctx context.Context, server *metadata.PraefectServer) (*grpc.ClientConn, error) { - address, err := server.Address() - if err != nil { - return nil, err - } - return m.conns.Dial(ctx, address, server.Token) -} - // transactionHandler is a callback invoked on a transaction if it exists. -type transactionHandler func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error +type transactionHandler func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error // runWithTransaction runs the given function if the payload identifies a transaction. No error // is returned if no transaction exists. If a transaction exists and the function is executed on it, @@ -37,18 +26,14 @@ func (m *GitLabHookManager) runWithTransaction(ctx context.Context, payload git. if payload.Transaction == nil { return nil } + if payload.Praefect == nil { + return errors.New("transaction without Praefect server") + } ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - praefectConn, err := m.getPraefectConn(ctx, payload.Praefect) - if err != nil { - return err - } - - praefectClient := gitalypb.NewRefTransactionClient(praefectConn) - - if err := handler(ctx, *payload.Transaction, praefectClient); err != nil { + if err := handler(ctx, *payload.Transaction, *payload.Praefect); err != nil { return err } @@ -56,40 +41,13 @@ func (m *GitLabHookManager) runWithTransaction(ctx context.Context, payload git. } func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, payload git.HooksPayload) error { - return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error { - defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration() - - response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: tx.ID, - Node: tx.Node, - ReferenceUpdatesHash: hash, - }) - if err != nil { - return err - } - - switch response.State { - case gitalypb.VoteTransactionResponse_COMMIT: - return nil - case gitalypb.VoteTransactionResponse_ABORT: - return errors.New("transaction was aborted") - case gitalypb.VoteTransactionResponse_STOP: - return errors.New("transaction was stopped") - default: - return errors.New("invalid transaction state") - } + return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error { + return m.txManager.Vote(ctx, tx, praefect, hash) }) } func (m *GitLabHookManager) stopTransaction(ctx context.Context, payload git.HooksPayload) error { - return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error { - _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ - TransactionId: tx.ID, - }) - if err != nil { - return err - } - - return nil + return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error { + return m.txManager.Stop(ctx, tx, praefect) }) } diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go new file mode 100644 index 000000000..3b7ab1707 --- /dev/null +++ b/internal/gitaly/transaction/manager.go @@ -0,0 +1,115 @@ +package transaction + +import ( + "context" + "errors" + + "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +// Manager is an interface for handling voting on transactions. +type Manager interface { + // Vote casts a vote on the given transaction which is hosted by the + // given Praefect server. + Vote(context.Context, metadata.Transaction, metadata.PraefectServer, []byte) error + + // Stop gracefully stops the given transaction which is hosted by the + // given Praefect server. + Stop(context.Context, metadata.Transaction, metadata.PraefectServer) error +} + +// PoolManager is an implementation of the Manager interface using a pool to +// connect to the transaction hosts. +type PoolManager struct { + conns *client.Pool + votingDelayMetric prometheus.Histogram +} + +// NewManager creates a new PoolManager to handle transactional voting. +func NewManager(cfg config.Cfg) *PoolManager { + return &PoolManager{ + conns: client.NewPoolWithOptions(client.WithDialOptions(client.FailOnNonTempDialError()...)), + votingDelayMetric: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "gitaly_hook_transaction_voting_delay_seconds", + Help: "Delay between calling out to transaction service and receiving a response", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + ), + } +} + +// Describe is used to describe Prometheus metrics. +func (m *PoolManager) Describe(descs chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(m, descs) +} + +// Collect is used to collect Prometheus metrics. +func (m *PoolManager) Collect(metrics chan<- prometheus.Metric) { + m.votingDelayMetric.Collect(metrics) +} + +func (m *PoolManager) getTransactionClient(ctx context.Context, server metadata.PraefectServer) (gitalypb.RefTransactionClient, error) { + address, err := server.Address() + if err != nil { + return nil, err + } + + conn, err := m.conns.Dial(ctx, address, server.Token) + if err != nil { + return nil, err + } + + return gitalypb.NewRefTransactionClient(conn), nil +} + +// Vote connects to the given server and casts hash as a vote for the +// transaction identified by tx. +func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server metadata.PraefectServer, hash []byte) error { + client, err := m.getTransactionClient(ctx, server) + if err != nil { + return err + } + + defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration() + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: tx.ID, + Node: tx.Node, + ReferenceUpdatesHash: hash, + }) + if err != nil { + return err + } + + switch response.State { + case gitalypb.VoteTransactionResponse_COMMIT: + return nil + case gitalypb.VoteTransactionResponse_ABORT: + return errors.New("transaction was aborted") + case gitalypb.VoteTransactionResponse_STOP: + return errors.New("transaction was stopped") + default: + return errors.New("invalid transaction state") + } +} + +// Stop connects to the given server and stops the transaction identified by tx. +func (m *PoolManager) Stop(ctx context.Context, tx metadata.Transaction, server metadata.PraefectServer) error { + client, err := m.getTransactionClient(ctx, server) + if err != nil { + return err + } + + if _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: tx.ID, + }); err != nil { + return err + } + + return nil +} diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go new file mode 100644 index 000000000..bbe552ef8 --- /dev/null +++ b/internal/gitaly/transaction/manager_test.go @@ -0,0 +1,167 @@ +package transaction + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type testTransactionServer struct { + vote func(*gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) + stop func(*gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) +} + +func (s *testTransactionServer) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + if s.vote != nil { + return s.vote(in) + } + return nil, nil +} + +func (s *testTransactionServer) StopTransaction(ctx context.Context, in *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + if s.stop != nil { + return s.stop(in) + } + return nil, nil +} + +func TestPoolManager_Vote(t *testing.T) { + transactionServer, praefect, stop := runTransactionServer(t) + defer stop() + + manager := NewManager(config.Config) + + ctx, cancel := testhelper.Context() + defer cancel() + + for _, tc := range []struct { + desc string + transaction metadata.Transaction + vote []byte + voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) + expectedErr error + }{ + { + desc: "successful vote", + transaction: metadata.Transaction{ + ID: 1, + Node: "node", + }, + vote: []byte("foobar"), + voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + require.Equal(t, uint64(1), request.TransactionId) + require.Equal(t, "node", request.Node) + require.Equal(t, request.ReferenceUpdatesHash, []byte("foobar")) + + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_COMMIT, + }, nil + }, + }, + { + desc: "aborted vote", + voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_ABORT, + }, nil + }, + expectedErr: errors.New("transaction was aborted"), + }, + { + desc: "stopped vote", + voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_STOP, + }, nil + }, + expectedErr: errors.New("transaction was stopped"), + }, + { + desc: "erroneous vote", + voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + return nil, status.Error(codes.Internal, "foobar") + }, + expectedErr: status.Error(codes.Internal, "foobar"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transactionServer.vote = func(request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + return tc.voteFn(t, request) + } + + err := manager.Vote(ctx, tc.transaction, praefect, tc.vote) + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestPoolManager_Stop(t *testing.T) { + transactionServer, praefect, stop := runTransactionServer(t) + defer stop() + + manager := NewManager(config.Config) + + ctx, cancel := testhelper.Context() + defer cancel() + + for _, tc := range []struct { + desc string + transaction metadata.Transaction + stopFn func(*testing.T, *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) + expectedErr error + }{ + { + desc: "successful stop", + transaction: metadata.Transaction{ + ID: 1, + Node: "node", + }, + stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + require.Equal(t, uint64(1), request.TransactionId) + return &gitalypb.StopTransactionResponse{}, nil + }, + }, + { + desc: "erroneous stop", + stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + return nil, status.Error(codes.Internal, "foobar") + }, + expectedErr: status.Error(codes.Internal, "foobar"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transactionServer.stop = func(request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + return tc.stopFn(t, request) + } + + err := manager.Stop(ctx, tc.transaction, praefect) + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func runTransactionServer(t *testing.T) (*testTransactionServer, metadata.PraefectServer, func()) { + transactionServer := &testTransactionServer{} + + server := testhelper.NewServerWithAuth(t, nil, nil, config.Config.Auth.Token, testhelper.WithInternalSocket(config.Config)) + gitalypb.RegisterRefTransactionServer(server.GrpcServer(), transactionServer) + server.Start(t) + + listener, address := testhelper.GetLocalhostListener(t) + go func() { require.NoError(t, server.GrpcServer().Serve(listener)) }() + + praefect := metadata.PraefectServer{ + ListenAddr: "tcp://" + address, + Token: config.Config.Auth.Token, + } + + return transactionServer, praefect, server.Stop +} diff --git a/internal/gitaly/transaction/testhelper_test.go b/internal/gitaly/transaction/testhelper_test.go new file mode 100644 index 000000000..e3d6dc85e --- /dev/null +++ b/internal/gitaly/transaction/testhelper_test.go @@ -0,0 +1,21 @@ +package transaction + +import ( + "os" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + defer testhelper.MustHaveNoChildProcess() + + cleanup := testhelper.Configure() + defer cleanup() + + return m.Run() +} |