Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-01-28 10:28:44 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-01-29 17:00:49 +0300
commit4b74f048231c8d3614343f9b760513e5d2fb3252 (patch)
tree6087265499992f74da3bd6b52ec7bf2eadacaacd
parent145dde08af8b6081de21a170acc15846a15507b9 (diff)
hook: Extract transaction handling into standalone manager
The hook manager currently hosts all logic to handle transactional voting. This made sense until now as the reference-transaction hook was the only place where we would do voting on transactions in the first place. But we're about to extend transactional support to more places which are not related to references at all. As a preparatory step, this commit thus extracts the transaction handling code into its own transaction manager. To keep the diff small and focussed on the functional change, the manager is still created ad-hoc in the hook manager. This is about to change with a subsequent commit.
-rw-r--r--internal/gitaly/hook/manager.go24
-rw-r--r--internal/gitaly/hook/transactions.go60
-rw-r--r--internal/gitaly/transaction/manager.go115
-rw-r--r--internal/gitaly/transaction/manager_test.go167
-rw-r--r--internal/gitaly/transaction/testhelper_test.go21
5 files changed, 320 insertions, 67 deletions
diff --git a/internal/gitaly/hook/manager.go b/internal/gitaly/hook/manager.go
index 77ac000bb..9b67024e9 100644
--- a/internal/gitaly/hook/manager.go
+++ b/internal/gitaly/hook/manager.go
@@ -5,8 +5,8 @@ import (
"io"
"github.com/prometheus/client_golang/prometheus"
- "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/storage"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -49,11 +49,10 @@ type Manager interface {
// GitLabHookManager is a hook manager containing Git hook business logic. It
// uses the GitLab API to authenticate and track ongoing hook calls.
type GitLabHookManager struct {
- locator storage.Locator
- gitlabAPI GitlabAPI
- hooksConfig config.Hooks
- conns *client.Pool
- votingDelayMetric prometheus.Histogram
+ locator storage.Locator
+ gitlabAPI GitlabAPI
+ hooksConfig config.Hooks
+ txManager *transaction.PoolManager
}
// NewManager returns a new hook manager
@@ -62,21 +61,14 @@ func NewManager(locator storage.Locator, gitlabAPI GitlabAPI, cfg config.Cfg) *G
locator: locator,
gitlabAPI: gitlabAPI,
hooksConfig: cfg.Hooks,
- conns: client.NewPoolWithOptions(client.WithDialOptions(client.FailOnNonTempDialError()...)),
- votingDelayMetric: prometheus.NewHistogram(
- prometheus.HistogramOpts{
- Name: "gitaly_hook_transaction_voting_delay_seconds",
- Help: "Delay between calling out to transaction service and receiving a response",
- Buckets: cfg.Prometheus.GRPCLatencyBuckets,
- },
- ),
+ txManager: transaction.NewManager(cfg),
}
}
func (m *GitLabHookManager) Describe(descs chan<- *prometheus.Desc) {
- prometheus.DescribeByCollect(m, descs)
+ prometheus.DescribeByCollect(m.txManager, descs)
}
func (m *GitLabHookManager) Collect(metrics chan<- prometheus.Metric) {
- m.votingDelayMetric.Collect(metrics)
+ m.txManager.Collect(metrics)
}
diff --git a/internal/gitaly/hook/transactions.go b/internal/gitaly/hook/transactions.go
index e12111b0d..9e7ef7324 100644
--- a/internal/gitaly/hook/transactions.go
+++ b/internal/gitaly/hook/transactions.go
@@ -5,11 +5,8 @@ import (
"errors"
"time"
- "github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
- "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
)
func isPrimary(payload git.HooksPayload) bool {
@@ -19,16 +16,8 @@ func isPrimary(payload git.HooksPayload) bool {
return payload.Transaction.Primary
}
-func (m *GitLabHookManager) getPraefectConn(ctx context.Context, server *metadata.PraefectServer) (*grpc.ClientConn, error) {
- address, err := server.Address()
- if err != nil {
- return nil, err
- }
- return m.conns.Dial(ctx, address, server.Token)
-}
-
// transactionHandler is a callback invoked on a transaction if it exists.
-type transactionHandler func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error
+type transactionHandler func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error
// runWithTransaction runs the given function if the payload identifies a transaction. No error
// is returned if no transaction exists. If a transaction exists and the function is executed on it,
@@ -37,18 +26,14 @@ func (m *GitLabHookManager) runWithTransaction(ctx context.Context, payload git.
if payload.Transaction == nil {
return nil
}
+ if payload.Praefect == nil {
+ return errors.New("transaction without Praefect server")
+ }
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
- praefectConn, err := m.getPraefectConn(ctx, payload.Praefect)
- if err != nil {
- return err
- }
-
- praefectClient := gitalypb.NewRefTransactionClient(praefectConn)
-
- if err := handler(ctx, *payload.Transaction, praefectClient); err != nil {
+ if err := handler(ctx, *payload.Transaction, *payload.Praefect); err != nil {
return err
}
@@ -56,40 +41,13 @@ func (m *GitLabHookManager) runWithTransaction(ctx context.Context, payload git.
}
func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, payload git.HooksPayload) error {
- return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error {
- defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration()
-
- response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: tx.ID,
- Node: tx.Node,
- ReferenceUpdatesHash: hash,
- })
- if err != nil {
- return err
- }
-
- switch response.State {
- case gitalypb.VoteTransactionResponse_COMMIT:
- return nil
- case gitalypb.VoteTransactionResponse_ABORT:
- return errors.New("transaction was aborted")
- case gitalypb.VoteTransactionResponse_STOP:
- return errors.New("transaction was stopped")
- default:
- return errors.New("invalid transaction state")
- }
+ return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error {
+ return m.txManager.Vote(ctx, tx, praefect, hash)
})
}
func (m *GitLabHookManager) stopTransaction(ctx context.Context, payload git.HooksPayload) error {
- return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error {
- _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
- TransactionId: tx.ID,
- })
- if err != nil {
- return err
- }
-
- return nil
+ return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error {
+ return m.txManager.Stop(ctx, tx, praefect)
})
}
diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go
new file mode 100644
index 000000000..3b7ab1707
--- /dev/null
+++ b/internal/gitaly/transaction/manager.go
@@ -0,0 +1,115 @@
+package transaction
+
+import (
+ "context"
+ "errors"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+// Manager is an interface for handling voting on transactions.
+type Manager interface {
+ // Vote casts a vote on the given transaction which is hosted by the
+ // given Praefect server.
+ Vote(context.Context, metadata.Transaction, metadata.PraefectServer, []byte) error
+
+ // Stop gracefully stops the given transaction which is hosted by the
+ // given Praefect server.
+ Stop(context.Context, metadata.Transaction, metadata.PraefectServer) error
+}
+
+// PoolManager is an implementation of the Manager interface using a pool to
+// connect to the transaction hosts.
+type PoolManager struct {
+ conns *client.Pool
+ votingDelayMetric prometheus.Histogram
+}
+
+// NewManager creates a new PoolManager to handle transactional voting.
+func NewManager(cfg config.Cfg) *PoolManager {
+ return &PoolManager{
+ conns: client.NewPoolWithOptions(client.WithDialOptions(client.FailOnNonTempDialError()...)),
+ votingDelayMetric: prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Name: "gitaly_hook_transaction_voting_delay_seconds",
+ Help: "Delay between calling out to transaction service and receiving a response",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ ),
+ }
+}
+
+// Describe is used to describe Prometheus metrics.
+func (m *PoolManager) Describe(descs chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(m, descs)
+}
+
+// Collect is used to collect Prometheus metrics.
+func (m *PoolManager) Collect(metrics chan<- prometheus.Metric) {
+ m.votingDelayMetric.Collect(metrics)
+}
+
+func (m *PoolManager) getTransactionClient(ctx context.Context, server metadata.PraefectServer) (gitalypb.RefTransactionClient, error) {
+ address, err := server.Address()
+ if err != nil {
+ return nil, err
+ }
+
+ conn, err := m.conns.Dial(ctx, address, server.Token)
+ if err != nil {
+ return nil, err
+ }
+
+ return gitalypb.NewRefTransactionClient(conn), nil
+}
+
+// Vote connects to the given server and casts hash as a vote for the
+// transaction identified by tx.
+func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server metadata.PraefectServer, hash []byte) error {
+ client, err := m.getTransactionClient(ctx, server)
+ if err != nil {
+ return err
+ }
+
+ defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration()
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: tx.ID,
+ Node: tx.Node,
+ ReferenceUpdatesHash: hash,
+ })
+ if err != nil {
+ return err
+ }
+
+ switch response.State {
+ case gitalypb.VoteTransactionResponse_COMMIT:
+ return nil
+ case gitalypb.VoteTransactionResponse_ABORT:
+ return errors.New("transaction was aborted")
+ case gitalypb.VoteTransactionResponse_STOP:
+ return errors.New("transaction was stopped")
+ default:
+ return errors.New("invalid transaction state")
+ }
+}
+
+// Stop connects to the given server and stops the transaction identified by tx.
+func (m *PoolManager) Stop(ctx context.Context, tx metadata.Transaction, server metadata.PraefectServer) error {
+ client, err := m.getTransactionClient(ctx, server)
+ if err != nil {
+ return err
+ }
+
+ if _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: tx.ID,
+ }); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go
new file mode 100644
index 000000000..bbe552ef8
--- /dev/null
+++ b/internal/gitaly/transaction/manager_test.go
@@ -0,0 +1,167 @@
+package transaction
+
+import (
+ "context"
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type testTransactionServer struct {
+ vote func(*gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error)
+ stop func(*gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error)
+}
+
+func (s *testTransactionServer) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ if s.vote != nil {
+ return s.vote(in)
+ }
+ return nil, nil
+}
+
+func (s *testTransactionServer) StopTransaction(ctx context.Context, in *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
+ if s.stop != nil {
+ return s.stop(in)
+ }
+ return nil, nil
+}
+
+func TestPoolManager_Vote(t *testing.T) {
+ transactionServer, praefect, stop := runTransactionServer(t)
+ defer stop()
+
+ manager := NewManager(config.Config)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for _, tc := range []struct {
+ desc string
+ transaction metadata.Transaction
+ vote []byte
+ voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error)
+ expectedErr error
+ }{
+ {
+ desc: "successful vote",
+ transaction: metadata.Transaction{
+ ID: 1,
+ Node: "node",
+ },
+ vote: []byte("foobar"),
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ require.Equal(t, uint64(1), request.TransactionId)
+ require.Equal(t, "node", request.Node)
+ require.Equal(t, request.ReferenceUpdatesHash, []byte("foobar"))
+
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_COMMIT,
+ }, nil
+ },
+ },
+ {
+ desc: "aborted vote",
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_ABORT,
+ }, nil
+ },
+ expectedErr: errors.New("transaction was aborted"),
+ },
+ {
+ desc: "stopped vote",
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_STOP,
+ }, nil
+ },
+ expectedErr: errors.New("transaction was stopped"),
+ },
+ {
+ desc: "erroneous vote",
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ return nil, status.Error(codes.Internal, "foobar")
+ },
+ expectedErr: status.Error(codes.Internal, "foobar"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transactionServer.vote = func(request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ return tc.voteFn(t, request)
+ }
+
+ err := manager.Vote(ctx, tc.transaction, praefect, tc.vote)
+ require.Equal(t, tc.expectedErr, err)
+ })
+ }
+}
+
+func TestPoolManager_Stop(t *testing.T) {
+ transactionServer, praefect, stop := runTransactionServer(t)
+ defer stop()
+
+ manager := NewManager(config.Config)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for _, tc := range []struct {
+ desc string
+ transaction metadata.Transaction
+ stopFn func(*testing.T, *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error)
+ expectedErr error
+ }{
+ {
+ desc: "successful stop",
+ transaction: metadata.Transaction{
+ ID: 1,
+ Node: "node",
+ },
+ stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
+ require.Equal(t, uint64(1), request.TransactionId)
+ return &gitalypb.StopTransactionResponse{}, nil
+ },
+ },
+ {
+ desc: "erroneous stop",
+ stopFn: func(t *testing.T, request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
+ return nil, status.Error(codes.Internal, "foobar")
+ },
+ expectedErr: status.Error(codes.Internal, "foobar"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transactionServer.stop = func(request *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
+ return tc.stopFn(t, request)
+ }
+
+ err := manager.Stop(ctx, tc.transaction, praefect)
+ require.Equal(t, tc.expectedErr, err)
+ })
+ }
+}
+
+func runTransactionServer(t *testing.T) (*testTransactionServer, metadata.PraefectServer, func()) {
+ transactionServer := &testTransactionServer{}
+
+ server := testhelper.NewServerWithAuth(t, nil, nil, config.Config.Auth.Token, testhelper.WithInternalSocket(config.Config))
+ gitalypb.RegisterRefTransactionServer(server.GrpcServer(), transactionServer)
+ server.Start(t)
+
+ listener, address := testhelper.GetLocalhostListener(t)
+ go func() { require.NoError(t, server.GrpcServer().Serve(listener)) }()
+
+ praefect := metadata.PraefectServer{
+ ListenAddr: "tcp://" + address,
+ Token: config.Config.Auth.Token,
+ }
+
+ return transactionServer, praefect, server.Stop
+}
diff --git a/internal/gitaly/transaction/testhelper_test.go b/internal/gitaly/transaction/testhelper_test.go
new file mode 100644
index 000000000..e3d6dc85e
--- /dev/null
+++ b/internal/gitaly/transaction/testhelper_test.go
@@ -0,0 +1,21 @@
+package transaction
+
+import (
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+ defer testhelper.MustHaveNoChildProcess()
+
+ cleanup := testhelper.Configure()
+ defer cleanup()
+
+ return m.Run()
+}